Фоновые горутины — планировщики, воркеры, outbox-relay — это место, где graceful shutdown чаще всего ломается молча. Горутина продолжает работу после получения сигнала остановки, пул базы данных закрывается раньше, чем транзакция завершилась. В результате при деплое часть операций уходит без commit — состояние расходится.
Разберём, как правильно организовать остановку фоновых горутин: что проверять, в каком порядке закрывать ресурсы и почему outbox-relay требует особого внимания.
Почему горутины не останавливаются сами
Горутина в Go — это независимый поток выполнения. Когда приложение получает SIGTERM, горутины не узнают об этом автоматически. Без явной передачи сигнала остановки горутина продолжает цикл бесконечно.
Стандартный инструмент — context.Context. Когда главный код вызывает cancel(), все горутины, которые смотрят на ctx.Done(), получают сигнал: «пора заканчивать».
Второй инструмент — sync.WaitGroup. Он позволяет главному коду подождать, пока все горутины завершат текущую итерацию, прежде чем закрывать соединения с базой данных.
Структура фоновой горутины
Любая фоновая горутина получает два аргумента: ctx context.Context и wg *sync.WaitGroup. Вызывающая сторона регистрирует горутину в WaitGroup до запуска.
type OutboxRelay struct {
pool *pgxpool.Pool
queries *db.Queries
producer *kafka.Writer
interval time.Duration
}
func (r *OutboxRelay) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.processOneBatch(ctx); err != nil {
slog.WarnContext(ctx, "outbox relay batch failed", "error", err)
}
}
}
}
Несколько важных деталей:
defer wg.Done()— первым, чтобы счётчик освобождался при любом выходе из функции.selectпроверяетctx.Done()перед каждым тиком, а не внутриprocessOneBatch. Это значит «не начинать новую итерацию», а не «прервать текущую».- Если
SIGTERMпришёл в момент выполненияprocessOneBatch, горутина дожидается его завершения и только на следующем витке видит<-ctx.Done()и выходит.
Частая ошибка — написать for { processOneBatch(ctx) } без select и ctx.Done(). Такая горутина игнорирует сигнал остановки, и WaitGroup никогда не освободится.
Порядок shutdown в main
Порядок закрытия ресурсов принципиален. Нарушение порядка приводит к панике: если закрыть пул базы данных до того, как горутины завершили транзакции, pgx паникует на попытке взять соединение из закрытого пула.
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := pgxpool.New(ctx, cfg.DatabaseURL)
if err != nil {
slog.Error("pgxpool connect", "error", err)
os.Exit(1)
}
relay := &OutboxRelay{
pool: pool,
queries: db.New(pool),
producer: &kafka.Writer{Addr: kafka.TCP(cfg.KafkaBroker), Topic: "order-events"},
interval: 500 * time.Millisecond,
}
var wg sync.WaitGroup
wg.Add(1)
go relay.Run(ctx, &wg)
srv := &http.Server{Addr: cfg.Addr, Handler: buildRouter()}
sigC := make(chan os.Signal, 1)
signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT)
defer signal.Stop(sigC)
select {
case sig := <-sigC:
slog.Info("получили SIGTERM, начинаем graceful shutdown", "signal", sig.String())
case err := <-func() chan error {
c := make(chan error, 1)
go func() { c <- srv.ListenAndServe() }()
return c
}():
slog.Error("server error", "error", err)
cancel()
return
}
appState.SetNotReady() // readiness → 503 первым
cancel() // сигнал горутинам: новых итераций не начинать
shutCtx, shutCancel := context.WithTimeout(context.Background(), 25*time.Second)
defer shutCancel()
if err := srv.Shutdown(shutCtx); err != nil {
slog.Error("http shutdown", "error", err)
}
wg.Wait() // дождаться завершения всех горутин
if err := producer.Close(); err != nil {
slog.Error("kafka writer close", "error", err)
}
pool.Close() // пул — последним
slog.Info("graceful shutdown завершён")
}
Правило простое: pool.Close() всегда строго после wg.Wait().
Outbox-relay: почему транзакцию нельзя прерывать
Outbox-relay — горутина, которая забирает события из таблицы outbox_event и публикует их в Kafka. Её особенность: незавершённая транзакция означает потенциальный дубль при публикации.
Поэтому внутри processOneBatch транзакция открывается на context.Background(), а не на родительском ctx. Даже если ctx уже отменён (пришёл SIGTERM), транзакция доводится до commit или rollback.
func (r *OutboxRelay) processOneBatch(ctx context.Context) error {
tx, err := r.pool.Begin(context.Background()) // отдельный ctx для транзакции
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(context.Background())
qtx := r.queries.WithTx(tx)
events, err := qtx.LockOutboxBatch(context.Background(), db.LockOutboxBatchParams{
Limit: 50,
})
if err != nil {
return fmt.Errorf("lock outbox batch: %w", err)
}
if len(events) == 0 {
return nil
}
for _, e := range events {
msg := kafka.Message{
Key: []byte(e.AggregateID),
Value: e.Payload,
}
if err := r.producer.WriteMessages(context.Background(), msg); err != nil {
return fmt.Errorf("publish event %s: %w", e.ID, err)
}
if err := qtx.MarkDispatched(context.Background(), e.ID); err != nil {
return fmt.Errorf("mark dispatched %s: %w", e.ID, err)
}
}
if err := tx.Commit(context.Background()); err != nil {
return fmt.Errorf("commit outbox batch: %w", err)
}
return nil
}
SQL-запрос для блокировки строк (sqlc):
-- name: LockOutboxBatch :many
SELECT id, aggregate_id, topic, payload
FROM outbox_event
WHERE dispatched_at IS NULL
ORDER BY created_at
LIMIT @limit
FOR UPDATE SKIP LOCKED;
FOR UPDATE SKIP LOCKED решает проблему параллельного запуска при rolling update. Если два экземпляра приложения запущены одновременно, они не берут одни и те же строки: каждый берёт свои и не ждёт других. Когда старый экземпляр завершает пачку и commit, строки помечены dispatched_at IS NOT NULL — новый их пропустит. Если старый упал без commit — строки разблокированы, новый подхватит.
Для обычных горутин без критичных транзакций (например, удаление устаревших записей) можно передавать ctx напрямую: прерывание допустимо, следующий запуск доделает.
Несколько горутин — один WaitGroup
Если сервис запускает несколько фоновых задач, все они регистрируются в одном WaitGroup:
var wg sync.WaitGroup
wg.Add(1)
go outboxRelay.Run(ctx, &wg)
wg.Add(1)
go productConsumer.Run(ctx, &wg)
wg.Add(1)
go expiredOrderCleaner.Run(ctx, &wg)
// при shutdown — один общий вызов
cancel()
wg.Wait()
pool.Close()
Все горутины завершаются параллельно, shutdown ждёт медленнейшей. Если OutboxRelay доводит пачку за 3 секунды, а ProductConsumer — за 8 секунд, суммарное ожидание 8 секунд, а не 11.
Бюджет времени
У graceful shutdown есть общий лимит — обычно 60 секунд, из которых Kubernetes оставляет около 10 секунд на preStop. Фоновые горутины должны укладываться в ~20 секунд.
Если пачка из 50 событий занимает слишком долго — уменьшите размер пачки до 10–15 событий. Увеличивать бюджет не нужно: пачка должна быть маленькой по дизайну.
Примерная раскладка:
preStop sleep 10s
http.Server.Shutdown ≤25s (параллельно с cancel горутин)
goroutines WaitGroup ≤20s
pool.Close ~1s
────────────────────────────
итого ≤56s < 60s
На практике HTTP-drain заканчивается раньше, чем outbox завершает пачку — они частично перекрываются.
Коротко
- Горутины не останавливаются сами при
SIGTERM— им нуженcontext.Contextсcancel(). ctx.Done()проверяется вselectперед каждым тиком: «не начинать новую итерацию», не прерывать текущую.defer wg.Done()— первой строкой в горутине,wg.Add(1)— доgoв вызывающем коде.pool.Close()строго послеwg.Wait()— иначе паника на открытой транзакции.- Внутри критичной секции outbox-relay транзакция открывается на
context.Background(), не на отменённомctx. FOR UPDATE SKIP LOCKEDзащищает от двойной обработки одних строк при rolling update.- Несколько горутин — один общий
WaitGroup, завершаются параллельно. - Размер пачки держите маленьким: фоновые горутины должны укладываться в ~20 секунд.
Что почитать дальше
- HTTP drain в Go —
http.Server.Shutdownи координация с preStop. - БД и persistence при shutdown — порядок
pool.Close()и транзакции. - Kafka shutdown в Go —
writer.Close()и commit offset при отмене контекста. - Идемпотентность при shutdown — защита от дублей при SIGTERM.
- Kubernetes и graceful shutdown —
terminationGracePeriodSeconds, preStop, rolling update.