Опирается на правила: R-SHUT-IDEM-1, R-SHUT-IDEM-X1 из контракта Graceful Shutdown → раздел 7. Идемпотентность in-flight.

Важно знать

  • Операции, которые SIGTERM может прервать, обязаны быть retry-safe — graceful shutdown даёт время, но не гарантирует отсутствие partial (R-SHUT-IDEM-1).
  • В Go нет фреймворкового @Transactional; транзакция передаётся явно через context.Context или параметром — ответственность за атомарность processed_event + side-effect лежит на вас.
  • HTTP-адаптер выходящего вызова обязан ставить Idempotency-Key заголовок один раз на бизнес-операцию — до всех retry; новый UUID на каждый retry создаёт дубль.
  • kafka-go не коммитит offset автоматически; CommitMessages вызывается явно — это защита от потери, но не от дублей при SIGTERM в окне обработки.
  • processed_event через INSERT ... ON CONFLICT (event_id) DO NOTHING в одной pgx-транзакции с side-effect — достаточная защита для Kafka-handler.
  • Outbox-relay: Kafka-send прошёл, UPDATE published_at не дошёл → следующий relay пошлёт снова; защита — receiver-side dedup или двух-фаза PENDING → PUBLISHING → PUBLISHED.
  • Money-cascade через http.Client с retry без Idempotency-Key — SIGTERM в момент первого запроса, новый pod шлёт повторно, provider обрабатывает оба (R-SHUT-IDEM-X1).
  • Идемпотентность — последняя линия защиты; graceful shutdown (http.Server.Shutdown, WaitGroup, context cancellation) — первая.

Graceful shutdown в Go управляется явно: os.Signal канал, context.WithCancel, http.Server.Shutdown(ctx), sync.WaitGroup для фоновых горутин. http.Server.Shutdown дожидается in-flight HTTP-запросов — большинство операций успевает завершиться в штатный таймаут 25–30 s. Но долгий cascade (HTTP с retry × 3 × 10 s) может не уложиться, а SIGKILL приходит через 60 s. Если операция не retry-safe — partial state.

Три типа in-flight операций

R-SHUT-IDEM-1 разбивает защиту по контексту: исходящий HTTP, Kafka-handler, outbox-relay.

1. Исходящий HTTP POST — Idempotency-Key в адаптере

Ключ генерируется до всех retry и передаётся адаптером наружу. В Go retry-логика часто встроена в transport-обёртку или реализуется вручную — в обоих случаях ключ фиксируется раз на бизнес-операцию.

// internal/adapters/out/payment/client.go

type ChargeCommand struct {
    IdempotencyKey string
    OrderID        string
    CustomerID     string
    AmountKopecks  int64
}

func (c *Client) Charge(ctx context.Context, cmd ChargeCommand) error {
    body, err := json.Marshal(map[string]any{
        "order_id":    cmd.OrderID,
        "customer_id": cmd.CustomerID,
        "amount":      cmd.AmountKopecks,
    })
    if err != nil {
        return fmt.Errorf("marshal charge request: %w", err)
    }

    req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/charges", bytes.NewReader(body))
    if err != nil {
        return fmt.Errorf("new charge request: %w", err)
    }
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("Idempotency-Key", cmd.IdempotencyKey) // R-SHUT-IDEM-1

    resp, err := c.http.Do(req)
    if err != nil {
        return fmt.Errorf("charge request: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
        return fmt.Errorf("charge failed: status %d", resp.StatusCode)
    }
    return nil
}

Генерация ключа — на уровне use case, один раз:

// internal/usecase/confirm_order.go

func (uc *ConfirmOrderUseCase) Execute(ctx context.Context, cmd ConfirmOrderCommand) error {
    order, err := uc.orders.FindByID(ctx, cmd.OrderID)
    if err != nil {
        return fmt.Errorf("find order: %w", err)
    }

    idempotencyKey := order.ID + ":charge:" + cmd.RequestID // детерминированный, не uuid.New()

    return uc.payment.Charge(ctx, payment.ChargeCommand{
        IdempotencyKey: idempotencyKey,
        OrderID:        order.ID,
        CustomerID:     order.CustomerID,
        AmountKopecks:  order.TotalKopecks,
    })
}

Детерминированный ключ (orderID + ":" + requestID) — идемпотентен при повторной обработке того же события. uuid.New() внутри retry создаёт уникальный ключ на каждый attempt — дубль.

2. Kafka-handler — processed_event в одной pgx-транзакции

kafka-go вызывает CommitMessages явно после обработки. При SIGTERM в окне между обработкой и коммитом offset consumer перечитает сообщение. processed_event в той же транзакции что side-effect — атомарная защита: либо оба закоммичены, либо оба откатились.

// internal/consumer/order_consumer.go

func (c *OrderConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var event OrderConfirmedEvent
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return fmt.Errorf("unmarshal order confirmed: %w", err)
    }

    tx, err := c.pool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    q := c.queries.WithTx(tx)

    inserted, err := q.InsertProcessedEvent(ctx, db.InsertProcessedEventParams{
        EventID:    event.ID,
        ConsumerID: "billing-service",
    })
    if err != nil {
        return fmt.Errorf("insert processed event: %w", err)
    }
    if !inserted {
        return nil // дубль — уже обработали
    }

    if err := c.billing.Charge(ctx, payment.ChargeCommand{
        IdempotencyKey: event.ID + ":billing",
        OrderID:        event.OrderID,
        CustomerID:     event.CustomerID,
        AmountKopecks:  event.TotalKopecks,
    }); err != nil {
        return fmt.Errorf("charge order %s: %w", event.OrderID, err)
    }

    return tx.Commit(ctx)
}

SQL-запрос для дедупликации:

-- query.sql (sqlc)
-- name: InsertProcessedEvent :one
INSERT INTO processed_event (event_id, consumer_id, processed_at)
VALUES (@event_id, @consumer_id, now())
ON CONFLICT (event_id, consumer_id) DO NOTHING
RETURNING (xmax = 0) AS inserted;

xmax = 0 возвращает true только когда строка была реально вставлена, false при конфликте. Так handler узнаёт без отдельного SELECT, нужна ли обработка.

3. Outbox-relay — двух-фаза или receiver-side dedup

Relay публикует события из таблицы outbox в Kafka. Если Kafka-send прошёл, а UPDATE published_at нет — следующий запуск relay пошлёт то же событие повторно.

Вариант A: receiver-side dedup (проще)

Если все downstream consumer'ы реализуют processed_event (как выше) — relay может публиковать дубли, consumer'ы их поглотят. Relay проще — нет состояний publishing:

// internal/scheduler/outbox_relay.go

func (r *OutboxRelay) processOneBatch(ctx context.Context) error {
    tx, err := r.pool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    q := r.queries.WithTx(tx)

    events, err := q.LockPendingOutboxBatch(ctx, r.batchSize) // FOR UPDATE SKIP LOCKED
    if err != nil {
        return fmt.Errorf("lock outbox batch: %w", err)
    }

    for _, e := range events {
        if err := r.writer.WriteMessages(ctx, kafka.Message{
            Key:   []byte(e.AggregateID),
            Value: e.Payload,
            Headers: []kafka.Header{
                {Key: "event-id", Value: []byte(e.ID)},
            },
        }); err != nil {
            return fmt.Errorf("write outbox event %s: %w", e.ID, err)
        }

        if err := q.MarkOutboxDispatched(ctx, e.ID); err != nil {
            return fmt.Errorf("mark dispatched %s: %w", e.ID, err)
        }
    }

    return tx.Commit(ctx)
}

Вариант B: двух-фаза (когда downstream dedup недоступен)

-- Статусы: PENDING → PUBLISHING → PUBLISHED
ALTER TABLE outbox_event ADD COLUMN status text NOT NULL DEFAULT 'PENDING';

-- name: LockAndMarkPublishing :many
UPDATE outbox_event
SET status = 'PUBLISHING', locked_at = now()
WHERE id IN (
    SELECT id FROM outbox_event
    WHERE status = 'PENDING'
    ORDER BY created_at
    LIMIT @batch_size
    FOR UPDATE SKIP LOCKED
)
RETURNING *;

-- name: MarkPublished :exec
UPDATE outbox_event SET status = 'PUBLISHED', published_at = now() WHERE id = @id;

При SIGTERM между send и MarkPublished строки остаются в PUBLISHING. Cleanup-горутина через настраиваемый TTL (например, 5 минут) возвращает зависшие строки в PENDING:

// internal/scheduler/outbox_cleanup.go

func (c *OutboxCleanup) Run(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    ticker := time.NewTicker(c.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := c.queries.ResetStuckPublishing(ctx, db.ResetStuckPublishingParams{
                StuckAfter: pgtype.Interval{Microseconds: int64(c.stuckAfter / time.Microsecond), Valid: true},
            }); err != nil {
                slog.WarnContext(ctx, "outbox cleanup failed", "error", err)
            }
        }
    }
}
-- name: ResetStuckPublishing :exec
UPDATE outbox_event
SET status = 'PENDING', locked_at = NULL
WHERE status = 'PUBLISHING'
  AND locked_at < now() - @stuck_after::interval;

Граничный случай: транзакция outbox на context.Background()

Relay-горутина управляется отменяемым контекстом. При SIGTERM контекст отменяется — processOneBatch должна завершить текущий batch, не начинать новый. Критичная секция использует context.Background() для pgx-транзакции, чтобы SIGTERM не прервал commit:

func (r *OutboxRelay) Run(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    ticker := time.NewTicker(r.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return // не начинаем новый batch
        case <-ticker.C:
        }

        // processOneBatch получает context.Background() для tx — не прерываем commit
        batchCtx, cancel := context.WithTimeout(context.Background(), r.batchTimeout)
        if err := r.processOneBatch(batchCtx); err != nil {
            slog.WarnContext(ctx, "outbox relay batch", "error", err)
        }
        cancel()
    }
}

ctx.Done() — сигнал «не начинать новый batch». context.Background() для самого batch — чтобы pgx-транзакция не получила cancel в середине commit.

Что запрещено

АнтипаттернПравилоЧто взамен
uuid.New() внутри retry-цикла как Idempotency-KeyR-SHUT-IDEM-X1детерминированный ключ на бизнес-операцию: orderID + ":" + requestID
Money через http.Client без Idempotency-Key заголовкаR-SHUT-IDEM-X1req.Header.Set("Idempotency-Key", cmd.IdempotencyKey) в адаптере
InsertProcessedEvent отдельным запросом вне pgx-транзакции с side-effectR-SHUT-IDEM-1один tx, оба действия внутри
CommitMessages до завершения side-effect в Kafka-handlerR-SHUT-IDEM-1CommitMessages после tx.Commit
Outbox-relay processOneBatch с отменяемым контекстом для pgx-транзакцииR-SHUT-IDEM-1context.Background() + отдельный timeout для критичной секции
for { processOneBatch(ctx) } без проверки ctx.Done() между итерациямиR-SHUT-SCHED-3select { case <-ctx.Done(): return; case <-ticker.C: }
Kafka-handler без processed_event при возможном replay offsetR-SHUT-IDEM-1ON CONFLICT (event_id, consumer_id) DO NOTHING в одной транзакции

Куда дальше

  • Бюджеты и observability — метрика app_shutdown_duration_seconds, cumulative-бюджет для Go-стека.
  • БД и persistence — pgxpool.Pool.Close() последним, порядок shutdown-последовательности.
  • HTTP drain — http.Server.Shutdown vs Close, preStop hook, 202+polling для долгих эндпоинтов.
  • Конфигурация shutdownos.Signal канал, context.WithTimeout, health.State с atomic.Bool.
  • Kafka shutdown — kafka-go consumer через context cancellation, writer.Close().
  • Kubernetes — terminationGracePeriodSeconds: 60, preStop, probes.
  • Фоновые задачи и outbox — sync.WaitGroup, outbox-relay цикл, cleanup-горутина.