Опирается на правила:
R-SHUT-DB-1…R-SHUT-DB-3иR-SHUT-DB-X1из Graceful Shutdown Style Guide → раздел 4. БД и persistence.
Важно знать
pgxpool.Pool.Close()вызывается последним — послеWaitGroup.Wait()по всем горутинам (consumer, scheduler, outbox-relay).- Не закрывать pool в отдельном
goroutineсdeferбез синхронизации — порядок завершения недетерминирован.- Активные транзакции в момент SIGTERM дожимаются через свой канал: HTTP-handler — через
srv.Shutdown, фоновые горутины — черезctx.Done()+WaitGroup.- Критичная секция транзакции открывается на
context.Background(), не на отменяемом контексте — отмена не должна рвать начатую запись.golang-migrateзапускается только на старте, не на shutdown — миф «очистить при выходе» не имеет смысла.- Ошибки закрытия пула —
Info, неError; нормальныйpool.Close()не повод для alert.pool.Close()раньшеWaitGroup.Wait()— pgx паникует при попытке взять соединение из закрытого пула, inconsistent state.
Пул соединений БД — последний ресурс в очереди на закрытие. Закрыть его раньше, чем завершились фоновые горутины, означает: scheduler или outbox-relay добегают до следующего pool.Acquire() и получают панику или ошибку на пустом месте, при том что бизнес-операция уже началась. Go не предоставляет DI-контейнера, который сам знает порядок; порядок задаётся руками в main через явную последовательность shutdown-шагов.
pgxpool закрывается последним
R-SHUT-DB-1: нет фреймворка, который управляет порядком за тебя. Explicit-последовательность в main — единственный источник правды.
// cmd/order-service/main.go
func run(ctx context.Context, cfg Config) error {
pool, err := pgxpool.New(ctx, cfg.DatabaseURL)
if err != nil {
return fmt.Errorf("pgxpool.New: %w", err)
}
appState := health.NewState()
consumerCtx, cancelConsumer := context.WithCancel(ctx)
var consumerWg sync.WaitGroup
schedulerCtx, cancelScheduler := context.WithCancel(ctx)
var schedulerWg sync.WaitGroup
queries := db.New(pool)
consumer := consumer.NewOrderConsumer(queries)
relay := scheduler.NewOutboxRelay(queries, pool)
consumerWg.Add(1)
go func() { defer consumerWg.Done(); consumer.Run(consumerCtx) }()
schedulerWg.Add(1)
go func() { defer schedulerWg.Done(); relay.Run(schedulerCtx, &schedulerWg) }()
srv := buildServer(cfg, appState, queries)
go srv.ListenAndServe()
sigC := make(chan os.Signal, 1)
signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT)
defer signal.Stop(sigC)
<-sigC
slog.InfoContext(ctx, "получили SIGTERM, начинаем graceful shutdown")
// 1. readiness → 503: k8s убирает pod из endpoints
appState.SetNotReady()
// 2. consumer: сигнал остановки + ожидание текущего сообщения
cancelConsumer()
consumerWg.Wait()
// 3. scheduler: сигнал остановки + ожидание текущей итерации
cancelScheduler()
schedulerWg.Wait()
// 4. HTTP: дожать in-flight запросы
shutCtx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
defer cancel()
if err := srv.Shutdown(shutCtx); err != nil {
slog.WarnContext(ctx, "http shutdown", "error", err)
}
// 5. пул БД — последним
pool.Close()
slog.InfoContext(ctx, "pgxpool закрыт")
return nil
}
Порядок важен: consumer и scheduler держат соединения из пула; pool.Close() до WaitGroup.Wait() означает, что они нарвутся на закрытый пул в середине транзакции.
Активные транзакции дожимаются
R-SHUT-DB-2: механизм дожатия зависит от того, кто держит транзакцию.
HTTP-handler с транзакцией
sqlc-запросы в handler выполняются в рамках пула; srv.Shutdown даёт им завершиться:
// internal/handler/order_handler.go
func (h *OrderHandler) Create(w http.ResponseWriter, r *http.Request) {
var req CreateOrderRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
httperr.Write(w, r, apperr.New(apperr.KindInvalid, "decode request", err))
return
}
order, err := h.queries.CreateOrder(r.Context(), db.CreateOrderParams{
CustomerID: req.CustomerID,
Amount: req.Amount,
})
if err != nil {
httperr.Write(w, r, apperr.New(apperr.KindInternal, "create order", err))
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(order)
}
При SIGTERM srv.Shutdown ждёт завершения in-flight запросов (R-SHUT-HTTP-1). Запрос либо успевает выполнить CreateOrder и вернуть 201, либо укладывается в shutdown-таймаут (25s) и клиент получает context.DeadlineExceeded → 503. В обоих случаях DB-состояние консистентно.
Фоновая горутина с транзакцией
Критичная секция: если контекст уже отменён (пришёл ctx.Done()), но транзакция уже начата — продолжать на context.Background():
// internal/scheduler/payment_settler.go
func (s *PaymentSettler) settle(ctx context.Context, orderID uuid.UUID) error {
// ctx может быть отменён — открываем tx на Background,
// чтобы SIGTERM не прервал начатую запись
tx, err := s.pool.Begin(context.Background())
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(context.Background())
q := db.New(tx)
order, err := q.LockOrderForUpdate(context.Background(), orderID)
if err != nil {
return fmt.Errorf("lock order %s: %w", orderID, err)
}
if err := q.UpdateOrderStatus(context.Background(), db.UpdateOrderStatusParams{
ID: order.ID,
Status: db.OrderStatusPaid,
}); err != nil {
return fmt.Errorf("update order status %s: %w", orderID, err)
}
if err := tx.Commit(context.Background()); err != nil {
return fmt.Errorf("commit payment settle %s: %w", orderID, err)
}
return nil
}
Горутина проверяет ctx.Done() перед началом новой итерации (не посередине):
// internal/scheduler/payment_settler.go
func (s *PaymentSettler) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
slog.InfoContext(ctx, "payment settler: завершаем работу")
return
case <-ticker.C:
if err := s.processNextOrder(ctx); err != nil {
slog.WarnContext(ctx, "payment settler", "error", err)
}
}
}
}
Отмена контекста — сигнал «не начинать следующую итерацию», а не «прервать текущую».
Outbox-relay с транзакцией
Relay использует FOR UPDATE SKIP LOCKED — атомарный захват batch. Текущий batch доводится до конца:
// internal/scheduler/outbox_relay.go
func (r *OutboxRelay) processOneBatch(ctx context.Context) error {
tx, err := r.pool.Begin(context.Background())
if err != nil {
return fmt.Errorf("begin outbox tx: %w", err)
}
defer tx.Rollback(context.Background())
q := db.New(tx)
events, err := q.LockOutboxBatch(context.Background(), batchSize)
if err != nil {
return fmt.Errorf("lock outbox batch: %w", err)
}
for _, e := range events {
if err := r.producer.Publish(context.Background(), e); err != nil {
return fmt.Errorf("publish event %s: %w", e.ID, err)
}
if err := q.MarkDispatched(context.Background(), e.ID); err != nil {
return fmt.Errorf("mark dispatched %s: %w", e.ID, err)
}
}
if err := tx.Commit(context.Background()); err != nil {
return fmt.Errorf("commit outbox batch: %w", err)
}
return nil
}
Домен Product: outbox для ProductCreatedEvent — та же схема, LockOutboxBatch возвращает события независимо от типа, MarkDispatched проставляет dispatched_at.
golang-migrate — только старт
R-SHUT-DB-3: миграции запускаются один раз при старте, на shutdown — ничего.
// cmd/order-service/main.go
func applyMigrations(ctx context.Context, databaseURL string) error {
m, err := migrate.New("file://migrations", databaseURL)
if err != nil {
return fmt.Errorf("migrate.New: %w", err)
}
defer m.Close()
if err := m.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) {
return fmt.Errorf("migrate up: %w", err)
}
slog.InfoContext(ctx, "миграции применены")
return nil
}
Вызывается до pgxpool.New в run. На shutdown — m.Close() уже вызван через defer. Нет никакого «cleanup при выходе» — схема БД не откатывается при остановке сервиса.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
pool.Close() до consumerWg.Wait() / schedulerWg.Wait() | R-SHUT-DB-X1 | закрывать pool последним в explicit-последовательности |
pool.Close() в отдельной горутине без синхронизации | R-SHUT-DB-1 | sequential shutdown-шаги в main |
tx.Begin(r.Context()) в критичной секции фоновой задачи | R-SHUT-DB-2 | context.Background() для транзакций вне HTTP |
slog.Error при нормальном pool.Close() | R-SHUT-OBS-X1 | slog.Info — нормальное закрытие |
| SQL-скрипт очистки на shutdown | R-SHUT-DB-3 | golang-migrate только на startup |
pool.Acquire без проверки ошибки после получения сигнала | R-SHUT-DB-2 | проверять errors.Is(err, pgx.ErrNoRows) и ошибки пула |
cancelCtx() без wg.Wait() перед pool.Close() | R-SHUT-DB-X1 | дожидаться WaitGroup перед закрытием пула |
Куда дальше
- Рантайм/конфигурация —
http.Server.Shutdown,appState.SetNotReady(),os.Signalканал. - HTTP drain —
srv.Shutdown(ctx), in-flight дожатие, долгие эндпоинты → 202. - Фоновые задачи и outbox —
sync.WaitGroup,ctx.Done(), outbox-relay цикл. - Kafka shutdown —
kafka-goconsumer,CommitMessages,writer.Close(). - Идемпотентность in-flight —
Idempotency-Key, outbox-дедупликация. - Бюджеты и observability —
app_shutdown_duration_seconds, структурный лог. - Kubernetes —
terminationGracePeriodSeconds, preStop, probes.