Опирается на правила: R-SHUT-SCHED-1R-SHUT-SCHED-3 и R-SHUT-SCHED-X1 из Graceful Shutdown Style Guide → раздел 5. Scheduled / async / outbox.

Важно знать

  • context.Context — единственный сигнал остановки горутине; ctx.Done() проверяется перед каждой новой итерацией, не прерывает текущую.
  • sync.WaitGroup обязателен для каждой фоновой горутины — shutdown ждёт wg.Wait() до закрытия пула.
  • pool.Close() вызывается строго после wg.Wait() — иначе pgx паникует на попытке взять соединение из закрытого пула.
  • Критичная секция транзакции использует context.Background(), не отменённый родительский ctx — транзакция должна дойти до commit/rollback даже после SIGTERM.
  • Outbox-relay завершает текущий batch (FOR UPDATE SKIP LOCKED атомарно) и выходит; новый pod подхватывает не залоченные строки.
  • for { ... } без ctx.Done() — горутина игнорирует SIGTERM, WaitGroup никогда не освободится.
  • Total budget 60s — scheduler/async ≤ 20s в рамках cumulative бюджета (R-SHUT-OBS-1).

Фоновые горутины в Go — планировщики, воркеры, outbox-relay — это место, где graceful shutdown чаще всего ломается молча. Горутина без проверки ctx.Done() продолжает работу после SIGTERM; WaitGroup без Wait() позволяет pool.Close() выполниться раньше транзакции. В результате при деплое часть операций уходит без commit или без side-effect — состояние расходится.

UCP формулирует три требования: горутина завершает текущую итерацию, не начинает новую (R-SHUT-SCHED-1); долгий каскад держит WaitGroup до конца критичной секции (R-SHUT-SCHED-2); outbox-relay доводит batch, проверяет ctx.Done() перед следующим тиком (R-SHUT-SCHED-3).

Структура фоновой горутины

Любая фоновая горутина получает ctx context.Context и wg *sync.WaitGroup. Вызывающий сторона регистрирует горутину в WaitGroup до запуска.

// internal/scheduler/outbox_relay.go
type OutboxRelay struct {
    pool     *pgxpool.Pool
    queries  *db.Queries
    producer *kafka.Writer
    interval time.Duration
}

func (r *OutboxRelay) Run(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    ticker := time.NewTicker(r.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := r.processOneBatch(ctx); err != nil {
                slog.WarnContext(ctx, "outbox relay batch failed", "error", err)
            }
        }
    }
}

defer wg.Done() — первым, чтобы горутина освобождала счётчик при любом выходе. select с ctx.Done() проверяется перед каждым тиком, не внутри processOneBatch — это значит «не начинать новую итерацию», а не «прервать текущую». Если SIGTERM пришёл в момент processOneBatch, горутина дожидается его завершения и только потом выходит через <-ctx.Done() на следующем витке.

Запуск и регистрация в main

// cmd/order-service/main.go
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    pool, err := pgxpool.New(ctx, cfg.DatabaseURL)
    if err != nil {
        slog.Error("pgxpool connect", "error", err)
        os.Exit(1)
    }

    appState := health.NewState()
    queries := db.New(pool)
    producer := &kafka.Writer{Addr: kafka.TCP(cfg.KafkaBroker), Topic: "order-events"}

    relay := &OutboxRelay{
        pool:     pool,
        queries:  queries,
        producer: producer,
        interval: 500 * time.Millisecond,
    }

    var wg sync.WaitGroup
    wg.Add(1)
    go relay.Run(ctx, &wg)

    srv := &http.Server{Addr: cfg.Addr, Handler: buildRouter(appState, queries)}

    sigC := make(chan os.Signal, 1)
    signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT)
    defer signal.Stop(sigC)

    errC := make(chan error, 1)
    go func() { errC <- srv.ListenAndServe() }()

    select {
    case sig := <-sigC:
        slog.Info("получили SIGTERM, начинаем graceful shutdown", "signal", sig.String())
    case err := <-errC:
        slog.Error("server error", "error", err)
        cancel()
        return
    }

    appState.SetNotReady()         // R-SHUT-CFG-3 — readiness → 503 первым
    cancel()                       // сигнал всем горутинам: новых итераций не начинать

    shutCtx, shutCancel := context.WithTimeout(context.Background(), 25*time.Second)
    defer shutCancel()
    if err := srv.Shutdown(shutCtx); err != nil {
        slog.Error("http shutdown", "error", err)
    }

    wg.Wait()                      // R-SHUT-SCHED-1 — дождаться завершения горутин

    if err := producer.Close(); err != nil {
        slog.Error("kafka writer close", "error", err)
    }
    pool.Close()                   // R-SHUT-DB-1 — пул последним
    slog.Info("graceful shutdown завершён")
}

Порядок фиксирован: сначала readiness в false, потом cancel(), потом srv.Shutdown, потом wg.Wait(), потом pool.Close(). Менять порядок нельзя — pool.Close() раньше wg.Wait() даёт панику в транзакции.

Outbox-relay: processOneBatch с FOR UPDATE SKIP LOCKED

R-SHUT-SCHED-3: критичная секция внутри processOneBatch работает до завершения независимо от состояния ctx. Транзакция открывается на context.Background() — не на отменённом ctx.

// internal/scheduler/outbox_relay.go
func (r *OutboxRelay) processOneBatch(ctx context.Context) error {
    tx, err := r.pool.Begin(context.Background()) // R-SHUT-SCHED-2 — отдельный ctx для tx
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(context.Background())

    qtx := r.queries.WithTx(tx)

    events, err := qtx.LockOutboxBatch(context.Background(), db.LockOutboxBatchParams{
        Limit: 50,
    })
    if err != nil {
        return fmt.Errorf("lock outbox batch: %w", err)
    }
    if len(events) == 0 {
        return nil
    }

    for _, e := range events {
        msg := kafka.Message{
            Key:   []byte(e.AggregateID),
            Value: e.Payload,
        }
        if err := r.producer.WriteMessages(context.Background(), msg); err != nil {
            return fmt.Errorf("publish event %s: %w", e.ID, err)
        }
        if err := qtx.MarkDispatched(context.Background(), e.ID); err != nil {
            return fmt.Errorf("mark dispatched %s: %w", e.ID, err)
        }
    }

    if err := tx.Commit(context.Background()); err != nil {
        return fmt.Errorf("commit outbox batch: %w", err)
    }
    return nil
}

SQL-запрос для LockOutboxBatch (sqlc):

-- name: LockOutboxBatch :many
SELECT id, aggregate_id, topic, payload
FROM outbox_event
WHERE dispatched_at IS NULL
ORDER BY created_at
LIMIT @limit
FOR UPDATE SKIP LOCKED;

FOR UPDATE SKIP LOCKED гарантирует: если на SIGTERM два pod'а запущены параллельно (rolling update), они не берут одни и те же строки. Когда старый pod завершает batch и commit, строки помечены dispatched_at IS NOT NULL — новый pod их пропустит. Если старый pod упал без commit — строки разблокированы, новый подхватит.

Несколько горутин — общий WaitGroup

Если сервис запускает несколько фоновых задач (outbox-relay, консьюмер Kafka, periodic-cleaner), все они регистрируются в одном WaitGroup:

// cmd/order-service/main.go — несколько горутин
var wg sync.WaitGroup

outboxRelay := NewOutboxRelay(pool, queries, producer)
wg.Add(1)
go outboxRelay.Run(ctx, &wg)

productConsumer := NewProductConsumer(kafkaReader, queries)
wg.Add(1)
go productConsumer.Run(ctx, &wg)

cleaner := NewExpiredOrderCleaner(pool, queries)
wg.Add(1)
go cleaner.Run(ctx, &wg)

// на shutdown — один общий wg.Wait()
cancel()
wg.Wait()
pool.Close()

Все горутины завершаются параллельно, shutdown ждёт медленнейшей. Если OutboxRelay доводит batch за 3s, а ProductConsumer — за 8s, суммарное ожидание 8s, не 11s.

Пример: periodic-cleaner для устаревших Order

Горутина без IO-batch — проще, но структура та же:

// internal/scheduler/order_cleaner.go
type ExpiredOrderCleaner struct {
    queries  *db.Queries
    interval time.Duration
    before   time.Duration
}

func (c *ExpiredOrderCleaner) Run(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    ticker := time.NewTicker(c.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            deleted, err := c.queries.DeleteExpiredOrders(ctx, db.DeleteExpiredOrdersParams{
                Before: pgtype.Timestamptz{Time: time.Now().Add(-c.before), Valid: true},
            })
            if err != nil {
                slog.WarnContext(ctx, "delete expired orders", "error", err)
                continue
            }
            if deleted > 0 {
                slog.InfoContext(ctx, "expired orders deleted", "count", deleted)
            }
        }
    }
}

Здесь ctx передаётся в DeleteExpiredOrders напрямую — запрос не критичный, его прерывание при отмене контекста допустимо (ничего не потеряем, следующий pod доделает). Это отличие от outbox-relay, где незавершённый batch означает потенциальный дубль при публикации.

Бюджет для outbox и scheduler

R-SHUT-OBS-1: outbox-relay и прочие горутины занимают ≤ 20s в cumulative бюджете 60s. Если batch из 50 событий занимает 5s (HTTP-вызовы к Kafka медленные) — уместиться реально. Если занимает 25s — уменьшить batch до 10-15 событий, не увеличивать бюджет.

preStop sleep        10s
http.Server.Shutdown ≤25s   (параллельно с cancel горутин)
goroutines WaitGroup ≤20s
pool.Close           ~1s
─────────────────────────
cumulative           ≤56s < 60s

http.Server.Shutdown и wg.Wait() выполняются последовательно в коде выше, но фактически HTTP дрейн заканчивается раньше, чем outbox завершает batch — в типичном сценарии они перекрываются во времени (preStop даёт 10s до SIGTERM, HTTP дожимается в первые секунды после cancel, outbox доводит batch в те же секунды).

Что запрещено

АнтипаттернПравилоЧто взамен
for { processOneBatch(ctx) } без ctx.Done()R-SHUT-SCHED-X1select { case <-ctx.Done(): return; case <-ticker.C: }
Горутина без wg.Add(1) / wg.Done()R-SHUT-SCHED-1wg.Add(1) до go, defer wg.Done() первым в горутине
pool.Close() до wg.Wait()R-SHUT-DB-X1wg.Wait() строго перед pool.Close()
Транзакция в критичной секции на отменённом ctxR-SHUT-SCHED-2context.Background() для pool.Begin и tx.Commit
cancel() без wg.Wait() перед завершением mainR-SHUT-SCHED-X1явный wg.Wait() в shutdown-последовательности
Batch размером 100+ событий без замера времениR-SHUT-OBS-1замерить, уменьшить batch до укладки в ≤ 20s
slog.Error на нормальное завершение горутиныR-SHUT-OBS-X1slog.Info для штатного выхода

Куда дальше

  • Бюджеты и observability — cumulative 60s бюджет, метрика app_shutdown_duration_seconds.
  • БД и persistence — порядок pool.Close() и транзакции на shutdown.
  • HTTP drain — http.Server.Shutdown и preStop координация.
  • Идемпотентность in-flight — retry-safety для outbox и Kafka-handler.
  • Kafka shutdown — writer.Close() и commit offset при отмене контекста.
  • Kubernetes — terminationGracePeriodSeconds, preStop, rolling update.