← назад к разделу

Фоновые горутины — планировщики, воркеры, outbox-relay — это место, где graceful shutdown чаще всего ломается молча. Горутина продолжает работу после получения сигнала остановки, пул базы данных закрывается раньше, чем транзакция завершилась. В результате при деплое часть операций уходит без commit — состояние расходится.

Разберём, как правильно организовать остановку фоновых горутин: что проверять, в каком порядке закрывать ресурсы и почему outbox-relay требует особого внимания.

Почему горутины не останавливаются сами

Горутина в Go — это независимый поток выполнения. Когда приложение получает SIGTERM, горутины не узнают об этом автоматически. Без явной передачи сигнала остановки горутина продолжает цикл бесконечно.

Стандартный инструмент — context.Context. Когда главный код вызывает cancel(), все горутины, которые смотрят на ctx.Done(), получают сигнал: «пора заканчивать».

Второй инструмент — sync.WaitGroup. Он позволяет главному коду подождать, пока все горутины завершат текущую итерацию, прежде чем закрывать соединения с базой данных.

Структура фоновой горутины

Любая фоновая горутина получает два аргумента: ctx context.Context и wg *sync.WaitGroup. Вызывающая сторона регистрирует горутину в WaitGroup до запуска.

type OutboxRelay struct {
    pool     *pgxpool.Pool
    queries  *db.Queries
    producer *kafka.Writer
    interval time.Duration
}

func (r *OutboxRelay) Run(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    ticker := time.NewTicker(r.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := r.processOneBatch(ctx); err != nil {
                slog.WarnContext(ctx, "outbox relay batch failed", "error", err)
            }
        }
    }
}

Несколько важных деталей:

  • defer wg.Done() — первым, чтобы счётчик освобождался при любом выходе из функции.
  • select проверяет ctx.Done() перед каждым тиком, а не внутри processOneBatch. Это значит «не начинать новую итерацию», а не «прервать текущую».
  • Если SIGTERM пришёл в момент выполнения processOneBatch, горутина дожидается его завершения и только на следующем витке видит <-ctx.Done() и выходит.

Частая ошибка — написать for { processOneBatch(ctx) } без select и ctx.Done(). Такая горутина игнорирует сигнал остановки, и WaitGroup никогда не освободится.

Порядок shutdown в main

Порядок закрытия ресурсов принципиален. Нарушение порядка приводит к панике: если закрыть пул базы данных до того, как горутины завершили транзакции, pgx паникует на попытке взять соединение из закрытого пула.

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    pool, err := pgxpool.New(ctx, cfg.DatabaseURL)
    if err != nil {
        slog.Error("pgxpool connect", "error", err)
        os.Exit(1)
    }

    relay := &OutboxRelay{
        pool:     pool,
        queries:  db.New(pool),
        producer: &kafka.Writer{Addr: kafka.TCP(cfg.KafkaBroker), Topic: "order-events"},
        interval: 500 * time.Millisecond,
    }

    var wg sync.WaitGroup
    wg.Add(1)
    go relay.Run(ctx, &wg)

    srv := &http.Server{Addr: cfg.Addr, Handler: buildRouter()}

    sigC := make(chan os.Signal, 1)
    signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT)
    defer signal.Stop(sigC)

    select {
    case sig := <-sigC:
        slog.Info("получили SIGTERM, начинаем graceful shutdown", "signal", sig.String())
    case err := <-func() chan error {
        c := make(chan error, 1)
        go func() { c <- srv.ListenAndServe() }()
        return c
    }():
        slog.Error("server error", "error", err)
        cancel()
        return
    }

    appState.SetNotReady()    // readiness → 503 первым

    cancel()                  // сигнал горутинам: новых итераций не начинать

    shutCtx, shutCancel := context.WithTimeout(context.Background(), 25*time.Second)
    defer shutCancel()
    if err := srv.Shutdown(shutCtx); err != nil {
        slog.Error("http shutdown", "error", err)
    }

    wg.Wait()                 // дождаться завершения всех горутин

    if err := producer.Close(); err != nil {
        slog.Error("kafka writer close", "error", err)
    }
    pool.Close()              // пул — последним
    slog.Info("graceful shutdown завершён")
}

Правило простое: pool.Close() всегда строго после wg.Wait().

Outbox-relay: почему транзакцию нельзя прерывать

Outbox-relay — горутина, которая забирает события из таблицы outbox_event и публикует их в Kafka. Её особенность: незавершённая транзакция означает потенциальный дубль при публикации.

Поэтому внутри processOneBatch транзакция открывается на context.Background(), а не на родительском ctx. Даже если ctx уже отменён (пришёл SIGTERM), транзакция доводится до commit или rollback.

func (r *OutboxRelay) processOneBatch(ctx context.Context) error {
    tx, err := r.pool.Begin(context.Background()) // отдельный ctx для транзакции
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(context.Background())

    qtx := r.queries.WithTx(tx)

    events, err := qtx.LockOutboxBatch(context.Background(), db.LockOutboxBatchParams{
        Limit: 50,
    })
    if err != nil {
        return fmt.Errorf("lock outbox batch: %w", err)
    }
    if len(events) == 0 {
        return nil
    }

    for _, e := range events {
        msg := kafka.Message{
            Key:   []byte(e.AggregateID),
            Value: e.Payload,
        }
        if err := r.producer.WriteMessages(context.Background(), msg); err != nil {
            return fmt.Errorf("publish event %s: %w", e.ID, err)
        }
        if err := qtx.MarkDispatched(context.Background(), e.ID); err != nil {
            return fmt.Errorf("mark dispatched %s: %w", e.ID, err)
        }
    }

    if err := tx.Commit(context.Background()); err != nil {
        return fmt.Errorf("commit outbox batch: %w", err)
    }
    return nil
}

SQL-запрос для блокировки строк (sqlc):

-- name: LockOutboxBatch :many
SELECT id, aggregate_id, topic, payload
FROM outbox_event
WHERE dispatched_at IS NULL
ORDER BY created_at
LIMIT @limit
FOR UPDATE SKIP LOCKED;

FOR UPDATE SKIP LOCKED решает проблему параллельного запуска при rolling update. Если два экземпляра приложения запущены одновременно, они не берут одни и те же строки: каждый берёт свои и не ждёт других. Когда старый экземпляр завершает пачку и commit, строки помечены dispatched_at IS NOT NULL — новый их пропустит. Если старый упал без commit — строки разблокированы, новый подхватит.

Для обычных горутин без критичных транзакций (например, удаление устаревших записей) можно передавать ctx напрямую: прерывание допустимо, следующий запуск доделает.

Несколько горутин — один WaitGroup

Если сервис запускает несколько фоновых задач, все они регистрируются в одном WaitGroup:

var wg sync.WaitGroup

wg.Add(1)
go outboxRelay.Run(ctx, &wg)

wg.Add(1)
go productConsumer.Run(ctx, &wg)

wg.Add(1)
go expiredOrderCleaner.Run(ctx, &wg)

// при shutdown — один общий вызов
cancel()
wg.Wait()
pool.Close()

Все горутины завершаются параллельно, shutdown ждёт медленнейшей. Если OutboxRelay доводит пачку за 3 секунды, а ProductConsumer — за 8 секунд, суммарное ожидание 8 секунд, а не 11.

Бюджет времени

У graceful shutdown есть общий лимит — обычно 60 секунд, из которых Kubernetes оставляет около 10 секунд на preStop. Фоновые горутины должны укладываться в ~20 секунд.

Если пачка из 50 событий занимает слишком долго — уменьшите размер пачки до 10–15 событий. Увеличивать бюджет не нужно: пачка должна быть маленькой по дизайну.

Примерная раскладка:

preStop sleep            10s
http.Server.Shutdown    ≤25s  (параллельно с cancel горутин)
goroutines WaitGroup    ≤20s
pool.Close               ~1s
────────────────────────────
итого                   ≤56s < 60s

На практике HTTP-drain заканчивается раньше, чем outbox завершает пачку — они частично перекрываются.

Коротко

  • Горутины не останавливаются сами при SIGTERM — им нужен context.Context с cancel().
  • ctx.Done() проверяется в select перед каждым тиком: «не начинать новую итерацию», не прерывать текущую.
  • defer wg.Done() — первой строкой в горутине, wg.Add(1) — до go в вызывающем коде.
  • pool.Close() строго после wg.Wait() — иначе паника на открытой транзакции.
  • Внутри критичной секции outbox-relay транзакция открывается на context.Background(), не на отменённом ctx.
  • FOR UPDATE SKIP LOCKED защищает от двойной обработки одних строк при rolling update.
  • Несколько горутин — один общий WaitGroup, завершаются параллельно.
  • Размер пачки держите маленьким: фоновые горутины должны укладываться в ~20 секунд.

Что почитать дальше

  • HTTP drain в Go — http.Server.Shutdown и координация с preStop.
  • БД и persistence при shutdown — порядок pool.Close() и транзакции.
  • Kafka shutdown в Go — writer.Close() и commit offset при отмене контекста.
  • Идемпотентность при shutdown — защита от дублей при SIGTERM.
  • Kubernetes и graceful shutdown — terminationGracePeriodSeconds, preStop, rolling update.