Опирается на правила:
R-SHUT-SCHED-1…R-SHUT-SCHED-3иR-SHUT-SCHED-X1из Graceful Shutdown Style Guide → раздел 5. Scheduled / async / outbox.
Важно знать
context.Context— единственный сигнал остановки горутине;ctx.Done()проверяется перед каждой новой итерацией, не прерывает текущую.sync.WaitGroupобязателен для каждой фоновой горутины — shutdown ждётwg.Wait()до закрытия пула.pool.Close()вызывается строго послеwg.Wait()— иначе pgx паникует на попытке взять соединение из закрытого пула.- Критичная секция транзакции использует
context.Background(), не отменённый родительский ctx — транзакция должна дойти до commit/rollback даже после SIGTERM.- Outbox-relay завершает текущий batch (
FOR UPDATE SKIP LOCKEDатомарно) и выходит; новый pod подхватывает не залоченные строки.for { ... }безctx.Done()— горутина игнорирует SIGTERM, WaitGroup никогда не освободится.- Total budget 60s — scheduler/async ≤ 20s в рамках cumulative бюджета (R-SHUT-OBS-1).
Фоновые горутины в Go — планировщики, воркеры, outbox-relay — это место, где graceful shutdown чаще всего ломается молча. Горутина без проверки ctx.Done() продолжает работу после SIGTERM; WaitGroup без Wait() позволяет pool.Close() выполниться раньше транзакции. В результате при деплое часть операций уходит без commit или без side-effect — состояние расходится.
UCP формулирует три требования: горутина завершает текущую итерацию, не начинает новую (R-SHUT-SCHED-1); долгий каскад держит WaitGroup до конца критичной секции (R-SHUT-SCHED-2); outbox-relay доводит batch, проверяет ctx.Done() перед следующим тиком (R-SHUT-SCHED-3).
Структура фоновой горутины
Любая фоновая горутина получает ctx context.Context и wg *sync.WaitGroup. Вызывающий сторона регистрирует горутину в WaitGroup до запуска.
// internal/scheduler/outbox_relay.go
type OutboxRelay struct {
pool *pgxpool.Pool
queries *db.Queries
producer *kafka.Writer
interval time.Duration
}
func (r *OutboxRelay) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.processOneBatch(ctx); err != nil {
slog.WarnContext(ctx, "outbox relay batch failed", "error", err)
}
}
}
}
defer wg.Done() — первым, чтобы горутина освобождала счётчик при любом выходе. select с ctx.Done() проверяется перед каждым тиком, не внутри processOneBatch — это значит «не начинать новую итерацию», а не «прервать текущую». Если SIGTERM пришёл в момент processOneBatch, горутина дожидается его завершения и только потом выходит через <-ctx.Done() на следующем витке.
Запуск и регистрация в main
// cmd/order-service/main.go
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := pgxpool.New(ctx, cfg.DatabaseURL)
if err != nil {
slog.Error("pgxpool connect", "error", err)
os.Exit(1)
}
appState := health.NewState()
queries := db.New(pool)
producer := &kafka.Writer{Addr: kafka.TCP(cfg.KafkaBroker), Topic: "order-events"}
relay := &OutboxRelay{
pool: pool,
queries: queries,
producer: producer,
interval: 500 * time.Millisecond,
}
var wg sync.WaitGroup
wg.Add(1)
go relay.Run(ctx, &wg)
srv := &http.Server{Addr: cfg.Addr, Handler: buildRouter(appState, queries)}
sigC := make(chan os.Signal, 1)
signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT)
defer signal.Stop(sigC)
errC := make(chan error, 1)
go func() { errC <- srv.ListenAndServe() }()
select {
case sig := <-sigC:
slog.Info("получили SIGTERM, начинаем graceful shutdown", "signal", sig.String())
case err := <-errC:
slog.Error("server error", "error", err)
cancel()
return
}
appState.SetNotReady() // R-SHUT-CFG-3 — readiness → 503 первым
cancel() // сигнал всем горутинам: новых итераций не начинать
shutCtx, shutCancel := context.WithTimeout(context.Background(), 25*time.Second)
defer shutCancel()
if err := srv.Shutdown(shutCtx); err != nil {
slog.Error("http shutdown", "error", err)
}
wg.Wait() // R-SHUT-SCHED-1 — дождаться завершения горутин
if err := producer.Close(); err != nil {
slog.Error("kafka writer close", "error", err)
}
pool.Close() // R-SHUT-DB-1 — пул последним
slog.Info("graceful shutdown завершён")
}
Порядок фиксирован: сначала readiness в false, потом cancel(), потом srv.Shutdown, потом wg.Wait(), потом pool.Close(). Менять порядок нельзя — pool.Close() раньше wg.Wait() даёт панику в транзакции.
Outbox-relay: processOneBatch с FOR UPDATE SKIP LOCKED
R-SHUT-SCHED-3: критичная секция внутри processOneBatch работает до завершения независимо от состояния ctx. Транзакция открывается на context.Background() — не на отменённом ctx.
// internal/scheduler/outbox_relay.go
func (r *OutboxRelay) processOneBatch(ctx context.Context) error {
tx, err := r.pool.Begin(context.Background()) // R-SHUT-SCHED-2 — отдельный ctx для tx
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(context.Background())
qtx := r.queries.WithTx(tx)
events, err := qtx.LockOutboxBatch(context.Background(), db.LockOutboxBatchParams{
Limit: 50,
})
if err != nil {
return fmt.Errorf("lock outbox batch: %w", err)
}
if len(events) == 0 {
return nil
}
for _, e := range events {
msg := kafka.Message{
Key: []byte(e.AggregateID),
Value: e.Payload,
}
if err := r.producer.WriteMessages(context.Background(), msg); err != nil {
return fmt.Errorf("publish event %s: %w", e.ID, err)
}
if err := qtx.MarkDispatched(context.Background(), e.ID); err != nil {
return fmt.Errorf("mark dispatched %s: %w", e.ID, err)
}
}
if err := tx.Commit(context.Background()); err != nil {
return fmt.Errorf("commit outbox batch: %w", err)
}
return nil
}
SQL-запрос для LockOutboxBatch (sqlc):
-- name: LockOutboxBatch :many
SELECT id, aggregate_id, topic, payload
FROM outbox_event
WHERE dispatched_at IS NULL
ORDER BY created_at
LIMIT @limit
FOR UPDATE SKIP LOCKED;
FOR UPDATE SKIP LOCKED гарантирует: если на SIGTERM два pod'а запущены параллельно (rolling update), они не берут одни и те же строки. Когда старый pod завершает batch и commit, строки помечены dispatched_at IS NOT NULL — новый pod их пропустит. Если старый pod упал без commit — строки разблокированы, новый подхватит.
Несколько горутин — общий WaitGroup
Если сервис запускает несколько фоновых задач (outbox-relay, консьюмер Kafka, periodic-cleaner), все они регистрируются в одном WaitGroup:
// cmd/order-service/main.go — несколько горутин
var wg sync.WaitGroup
outboxRelay := NewOutboxRelay(pool, queries, producer)
wg.Add(1)
go outboxRelay.Run(ctx, &wg)
productConsumer := NewProductConsumer(kafkaReader, queries)
wg.Add(1)
go productConsumer.Run(ctx, &wg)
cleaner := NewExpiredOrderCleaner(pool, queries)
wg.Add(1)
go cleaner.Run(ctx, &wg)
// на shutdown — один общий wg.Wait()
cancel()
wg.Wait()
pool.Close()
Все горутины завершаются параллельно, shutdown ждёт медленнейшей. Если OutboxRelay доводит batch за 3s, а ProductConsumer — за 8s, суммарное ожидание 8s, не 11s.
Пример: periodic-cleaner для устаревших Order
Горутина без IO-batch — проще, но структура та же:
// internal/scheduler/order_cleaner.go
type ExpiredOrderCleaner struct {
queries *db.Queries
interval time.Duration
before time.Duration
}
func (c *ExpiredOrderCleaner) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
deleted, err := c.queries.DeleteExpiredOrders(ctx, db.DeleteExpiredOrdersParams{
Before: pgtype.Timestamptz{Time: time.Now().Add(-c.before), Valid: true},
})
if err != nil {
slog.WarnContext(ctx, "delete expired orders", "error", err)
continue
}
if deleted > 0 {
slog.InfoContext(ctx, "expired orders deleted", "count", deleted)
}
}
}
}
Здесь ctx передаётся в DeleteExpiredOrders напрямую — запрос не критичный, его прерывание при отмене контекста допустимо (ничего не потеряем, следующий pod доделает). Это отличие от outbox-relay, где незавершённый batch означает потенциальный дубль при публикации.
Бюджет для outbox и scheduler
R-SHUT-OBS-1: outbox-relay и прочие горутины занимают ≤ 20s в cumulative бюджете 60s. Если batch из 50 событий занимает 5s (HTTP-вызовы к Kafka медленные) — уместиться реально. Если занимает 25s — уменьшить batch до 10-15 событий, не увеличивать бюджет.
preStop sleep 10s
http.Server.Shutdown ≤25s (параллельно с cancel горутин)
goroutines WaitGroup ≤20s
pool.Close ~1s
─────────────────────────
cumulative ≤56s < 60s
http.Server.Shutdown и wg.Wait() выполняются последовательно в коде выше, но фактически HTTP дрейн заканчивается раньше, чем outbox завершает batch — в типичном сценарии они перекрываются во времени (preStop даёт 10s до SIGTERM, HTTP дожимается в первые секунды после cancel, outbox доводит batch в те же секунды).
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
for { processOneBatch(ctx) } без ctx.Done() | R-SHUT-SCHED-X1 | select { case <-ctx.Done(): return; case <-ticker.C: } |
Горутина без wg.Add(1) / wg.Done() | R-SHUT-SCHED-1 | wg.Add(1) до go, defer wg.Done() первым в горутине |
pool.Close() до wg.Wait() | R-SHUT-DB-X1 | wg.Wait() строго перед pool.Close() |
Транзакция в критичной секции на отменённом ctx | R-SHUT-SCHED-2 | context.Background() для pool.Begin и tx.Commit |
cancel() без wg.Wait() перед завершением main | R-SHUT-SCHED-X1 | явный wg.Wait() в shutdown-последовательности |
| Batch размером 100+ событий без замера времени | R-SHUT-OBS-1 | замерить, уменьшить batch до укладки в ≤ 20s |
slog.Error на нормальное завершение горутины | R-SHUT-OBS-X1 | slog.Info для штатного выхода |
Куда дальше
- Бюджеты и observability — cumulative 60s бюджет, метрика
app_shutdown_duration_seconds. - БД и persistence — порядок
pool.Close()и транзакции на shutdown. - HTTP drain —
http.Server.Shutdownи preStop координация. - Идемпотентность in-flight — retry-safety для outbox и Kafka-handler.
- Kafka shutdown —
writer.Close()и commit offset при отмене контекста. - Kubernetes —
terminationGracePeriodSeconds, preStop, rolling update.