---
title: "Фоновые горутины и outbox-relay — context.Context + sync.WaitGroup"
nav_title: "Фоновые горутины / outbox"
excerpt: "Graceful shutdown фоновых горутин и outbox-relay в Go: context.Context как сигнал остановки, sync.WaitGroup для ожидания завершения текущей итерации, FOR UPDATE SKIP LOCKED в…"
keywords: "go graceful shutdown goroutine, sync.WaitGroup shutdown, context.Context cancel goroutine, outbox relay go shutdown, FOR UPDATE SKIP LOCKED go, R-SHUT-SCHED go, pgx outbox graceful"
focus_keyword: "graceful shutdown горутины outbox Go"
tags:
  - graceful-shutdown
  - go
  - outbox
  - goroutine
  - context
---

# Фоновые горутины и outbox-relay — context.Context + sync.WaitGroup

> **Опирается на правила:** `R-SHUT-SCHED-1` … `R-SHUT-SCHED-3` и `R-SHUT-SCHED-X1` из Graceful Shutdown Style Guide → [раздел 5. Scheduled / async / outbox](/standards/backend/graceful-shutdown/#5-scheduled--async--outbox--r-shut-sched).

> **Важно знать**
> - **`context.Context`** — единственный сигнал остановки горутине; `ctx.Done()` проверяется перед каждой новой итерацией, не прерывает текущую.
> - **`sync.WaitGroup`** обязателен для каждой фоновой горутины — shutdown ждёт `wg.Wait()` до закрытия пула.
> - **`pool.Close()`** вызывается строго после `wg.Wait()` — иначе pgx паникует на попытке взять соединение из закрытого пула.
> - **Критичная секция транзакции** использует `context.Background()`, не отменённый родительский ctx — транзакция должна дойти до commit/rollback даже после SIGTERM.
> - **Outbox-relay** завершает текущий batch (`FOR UPDATE SKIP LOCKED` атомарно) и выходит; новый pod подхватывает не залоченные строки.
> - **`for { ... }` без `ctx.Done()`** — горутина игнорирует SIGTERM, WaitGroup никогда не освободится.
> - **Total budget 60s** — scheduler/async ≤ 20s в рамках cumulative бюджета (R-SHUT-OBS-1).

Фоновые горутины в Go — планировщики, воркеры, outbox-relay — это место, где graceful shutdown чаще всего ломается молча. Горутина без проверки `ctx.Done()` продолжает работу после SIGTERM; `WaitGroup` без `Wait()` позволяет `pool.Close()` выполниться раньше транзакции. В результате при деплое часть операций уходит без commit или без side-effect — состояние расходится.

UCP формулирует три требования: горутина завершает текущую итерацию, не начинает новую (R-SHUT-SCHED-1); долгий каскад держит `WaitGroup` до конца критичной секции (R-SHUT-SCHED-2); outbox-relay доводит batch, проверяет `ctx.Done()` перед следующим тиком (R-SHUT-SCHED-3).

## Структура фоновой горутины

Любая фоновая горутина получает `ctx context.Context` и `wg *sync.WaitGroup`. Вызывающий сторона регистрирует горутину в WaitGroup до запуска.

```go
// internal/scheduler/outbox_relay.go
type OutboxRelay struct {
    pool     *pgxpool.Pool
    queries  *db.Queries
    producer *kafka.Writer
    interval time.Duration
}

func (r *OutboxRelay) Run(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    ticker := time.NewTicker(r.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := r.processOneBatch(ctx); err != nil {
                slog.WarnContext(ctx, "outbox relay batch failed", "error", err)
            }
        }
    }
}
```

`defer wg.Done()` — первым, чтобы горутина освобождала счётчик при любом выходе. `select` с `ctx.Done()` проверяется перед каждым тиком, не внутри `processOneBatch` — это значит «не начинать новую итерацию», а не «прервать текущую». Если SIGTERM пришёл в момент `processOneBatch`, горутина дожидается его завершения и только потом выходит через `<-ctx.Done()` на следующем витке.

## Запуск и регистрация в main

```go
// cmd/order-service/main.go
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    pool, err := pgxpool.New(ctx, cfg.DatabaseURL)
    if err != nil {
        slog.Error("pgxpool connect", "error", err)
        os.Exit(1)
    }

    appState := health.NewState()
    queries := db.New(pool)
    producer := &kafka.Writer{Addr: kafka.TCP(cfg.KafkaBroker), Topic: "order-events"}

    relay := &OutboxRelay{
        pool:     pool,
        queries:  queries,
        producer: producer,
        interval: 500 * time.Millisecond,
    }

    var wg sync.WaitGroup
    wg.Add(1)
    go relay.Run(ctx, &wg)

    srv := &http.Server{Addr: cfg.Addr, Handler: buildRouter(appState, queries)}

    sigC := make(chan os.Signal, 1)
    signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT)
    defer signal.Stop(sigC)

    errC := make(chan error, 1)
    go func() { errC <- srv.ListenAndServe() }()

    select {
    case sig := <-sigC:
        slog.Info("получили SIGTERM, начинаем graceful shutdown", "signal", sig.String())
    case err := <-errC:
        slog.Error("server error", "error", err)
        cancel()
        return
    }

    appState.SetNotReady()         // R-SHUT-CFG-3 — readiness → 503 первым
    cancel()                       // сигнал всем горутинам: новых итераций не начинать

    shutCtx, shutCancel := context.WithTimeout(context.Background(), 25*time.Second)
    defer shutCancel()
    if err := srv.Shutdown(shutCtx); err != nil {
        slog.Error("http shutdown", "error", err)
    }

    wg.Wait()                      // R-SHUT-SCHED-1 — дождаться завершения горутин

    if err := producer.Close(); err != nil {
        slog.Error("kafka writer close", "error", err)
    }
    pool.Close()                   // R-SHUT-DB-1 — пул последним
    slog.Info("graceful shutdown завершён")
}
```

Порядок фиксирован: сначала readiness в `false`, потом `cancel()`, потом `srv.Shutdown`, потом `wg.Wait()`, потом `pool.Close()`. Менять порядок нельзя — `pool.Close()` раньше `wg.Wait()` даёт панику в транзакции.

## Outbox-relay: processOneBatch с FOR UPDATE SKIP LOCKED

`R-SHUT-SCHED-3`: критичная секция внутри `processOneBatch` работает до завершения независимо от состояния ctx. Транзакция открывается на `context.Background()` — не на отменённом `ctx`.

```go
// internal/scheduler/outbox_relay.go
func (r *OutboxRelay) processOneBatch(ctx context.Context) error {
    tx, err := r.pool.Begin(context.Background()) // R-SHUT-SCHED-2 — отдельный ctx для tx
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(context.Background())

    qtx := r.queries.WithTx(tx)

    events, err := qtx.LockOutboxBatch(context.Background(), db.LockOutboxBatchParams{
        Limit: 50,
    })
    if err != nil {
        return fmt.Errorf("lock outbox batch: %w", err)
    }
    if len(events) == 0 {
        return nil
    }

    for _, e := range events {
        msg := kafka.Message{
            Key:   []byte(e.AggregateID),
            Value: e.Payload,
        }
        if err := r.producer.WriteMessages(context.Background(), msg); err != nil {
            return fmt.Errorf("publish event %s: %w", e.ID, err)
        }
        if err := qtx.MarkDispatched(context.Background(), e.ID); err != nil {
            return fmt.Errorf("mark dispatched %s: %w", e.ID, err)
        }
    }

    if err := tx.Commit(context.Background()); err != nil {
        return fmt.Errorf("commit outbox batch: %w", err)
    }
    return nil
}
```

SQL-запрос для `LockOutboxBatch` (sqlc):

```sql
-- name: LockOutboxBatch :many
SELECT id, aggregate_id, topic, payload
FROM outbox_event
WHERE dispatched_at IS NULL
ORDER BY created_at
LIMIT @limit
FOR UPDATE SKIP LOCKED;
```

`FOR UPDATE SKIP LOCKED` гарантирует: если на SIGTERM два pod'а запущены параллельно (rolling update), они не берут одни и те же строки. Когда старый pod завершает batch и commit, строки помечены `dispatched_at IS NOT NULL` — новый pod их пропустит. Если старый pod упал без commit — строки разблокированы, новый подхватит.

## Несколько горутин — общий WaitGroup

Если сервис запускает несколько фоновых задач (outbox-relay, консьюмер Kafka, periodic-cleaner), все они регистрируются в одном `WaitGroup`:

```go
// cmd/order-service/main.go — несколько горутин
var wg sync.WaitGroup

outboxRelay := NewOutboxRelay(pool, queries, producer)
wg.Add(1)
go outboxRelay.Run(ctx, &wg)

productConsumer := NewProductConsumer(kafkaReader, queries)
wg.Add(1)
go productConsumer.Run(ctx, &wg)

cleaner := NewExpiredOrderCleaner(pool, queries)
wg.Add(1)
go cleaner.Run(ctx, &wg)

// на shutdown — один общий wg.Wait()
cancel()
wg.Wait()
pool.Close()
```

Все горутины завершаются параллельно, shutdown ждёт медленнейшей. Если `OutboxRelay` доводит batch за 3s, а `ProductConsumer` — за 8s, суммарное ожидание 8s, не 11s.

## Пример: periodic-cleaner для устаревших Order

Горутина без IO-batch — проще, но структура та же:

```go
// internal/scheduler/order_cleaner.go
type ExpiredOrderCleaner struct {
    queries  *db.Queries
    interval time.Duration
    before   time.Duration
}

func (c *ExpiredOrderCleaner) Run(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    ticker := time.NewTicker(c.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            deleted, err := c.queries.DeleteExpiredOrders(ctx, db.DeleteExpiredOrdersParams{
                Before: pgtype.Timestamptz{Time: time.Now().Add(-c.before), Valid: true},
            })
            if err != nil {
                slog.WarnContext(ctx, "delete expired orders", "error", err)
                continue
            }
            if deleted > 0 {
                slog.InfoContext(ctx, "expired orders deleted", "count", deleted)
            }
        }
    }
}
```

Здесь `ctx` передаётся в `DeleteExpiredOrders` напрямую — запрос не критичный, его прерывание при отмене контекста допустимо (ничего не потеряем, следующий pod доделает). Это отличие от outbox-relay, где незавершённый batch означает потенциальный дубль при публикации.

## Бюджет для outbox и scheduler

`R-SHUT-OBS-1`: outbox-relay и прочие горутины занимают ≤ 20s в cumulative бюджете 60s. Если batch из 50 событий занимает 5s (HTTP-вызовы к Kafka медленные) — уместиться реально. Если занимает 25s — уменьшить batch до 10-15 событий, не увеличивать бюджет.

```
preStop sleep        10s
http.Server.Shutdown ≤25s   (параллельно с cancel горутин)
goroutines WaitGroup ≤20s
pool.Close           ~1s
─────────────────────────
cumulative           ≤56s < 60s
```

`http.Server.Shutdown` и `wg.Wait()` выполняются последовательно в коде выше, но фактически HTTP дрейн заканчивается раньше, чем outbox завершает batch — в типичном сценарии они перекрываются во времени (preStop даёт 10s до SIGTERM, HTTP дожимается в первые секунды после cancel, outbox доводит batch в те же секунды).

## Что запрещено

| Антипаттерн | Правило | Что взамен |
|---|---|---|
| `for { processOneBatch(ctx) }` без `ctx.Done()` | `R-SHUT-SCHED-X1` | `select { case <-ctx.Done(): return; case <-ticker.C: }` |
| Горутина без `wg.Add(1)` / `wg.Done()` | `R-SHUT-SCHED-1` | `wg.Add(1)` до `go`, `defer wg.Done()` первым в горутине |
| `pool.Close()` до `wg.Wait()` | `R-SHUT-DB-X1` | `wg.Wait()` строго перед `pool.Close()` |
| Транзакция в критичной секции на отменённом `ctx` | `R-SHUT-SCHED-2` | `context.Background()` для `pool.Begin` и `tx.Commit` |
| `cancel()` без `wg.Wait()` перед завершением `main` | `R-SHUT-SCHED-X1` | явный `wg.Wait()` в shutdown-последовательности |
| Batch размером 100+ событий без замера времени | `R-SHUT-OBS-1` | замерить, уменьшить batch до укладки в ≤ 20s |
| `slog.Error` на нормальное завершение горутины | `R-SHUT-OBS-X1` | `slog.Info` для штатного выхода |

## Куда дальше

- [Бюджеты и observability](go/budgets-and-observability.md) — cumulative 60s бюджет, метрика `app_shutdown_duration_seconds`.
- [БД и persistence](go/db-and-persistence.md) — порядок `pool.Close()` и транзакции на shutdown.
- [HTTP drain](go/http-drain.md) — `http.Server.Shutdown` и preStop координация.
- [Идемпотентность in-flight](go/idempotency-in-flight.md) — retry-safety для outbox и Kafka-handler.
- [Kafka shutdown](go/kafka-shutdown.md) — `writer.Close()` и commit offset при отмене контекста.
- [Kubernetes](go/kubernetes.md) — `terminationGracePeriodSeconds`, preStop, rolling update.
