---
title: "Идемпотентность in-flight операций в Go — защита от replay при SIGTERM"
nav_title: "Идемпотентность in-flight"
excerpt: "Идемпотентность in-flight в Go (net/http+chi, pgx, kafka-go): Idempotency-Key в адаптере, processed_event в pgx-транзакции, двух-фаза outbox — защита от replay при SIGTERM."
keywords: "идемпотентность graceful shutdown go, replay-safe go, processed_event pgx, Idempotency-Key net/http, outbox dedup go, R-SHUT-IDEM, kafka-go idempotent consumer"
focus_keyword: "идемпотентность graceful shutdown go"
tags:
  - go
  - graceful-shutdown
  - idempotency
  - kafka
  - pgx
---

# Идемпотентность in-flight операций в Go — защита от replay при SIGTERM

> **Опирается на правила:** `R-SHUT-IDEM-1`, `R-SHUT-IDEM-X1` из контракта Graceful Shutdown → [раздел 7. Идемпотентность in-flight](/standards/backend/graceful-shutdown/#7-идемпотентность-in-flight-операций--r-shut-idem).

> **Важно знать**
> - Операции, которые SIGTERM может прервать, обязаны быть retry-safe — graceful shutdown даёт **время**, но не гарантирует **отсутствие partial** (`R-SHUT-IDEM-1`).
> - В Go нет фреймворкового `@Transactional`; транзакция передаётся явно через `context.Context` или параметром — ответственность за атомарность processed_event + side-effect лежит на вас.
> - HTTP-адаптер выходящего вызова обязан ставить `Idempotency-Key` заголовок **один раз на бизнес-операцию** — до всех retry; новый UUID на каждый retry создаёт дубль.
> - `kafka-go` не коммитит offset автоматически; `CommitMessages` вызывается явно — это защита от потери, но не от дублей при SIGTERM в окне обработки.
> - processed_event через `INSERT ... ON CONFLICT (event_id) DO NOTHING` в одной pgx-транзакции с side-effect — достаточная защита для Kafka-handler.
> - Outbox-relay: Kafka-send прошёл, UPDATE `published_at` не дошёл → следующий relay пошлёт снова; защита — receiver-side dedup или двух-фаза `PENDING → PUBLISHING → PUBLISHED`.
> - Money-cascade через `http.Client` с retry без `Idempotency-Key` — SIGTERM в момент первого запроса, новый pod шлёт повторно, provider обрабатывает оба (`R-SHUT-IDEM-X1`).
> - Идемпотентность — последняя линия защиты; graceful shutdown (`http.Server.Shutdown`, `WaitGroup`, context cancellation) — первая.

Graceful shutdown в Go управляется явно: `os.Signal` канал, `context.WithCancel`, `http.Server.Shutdown(ctx)`, `sync.WaitGroup` для фоновых горутин. `http.Server.Shutdown` дожидается in-flight HTTP-запросов — большинство операций успевает завершиться в штатный таймаут 25–30 s. Но долгий cascade (HTTP с retry × 3 × 10 s) может не уложиться, а SIGKILL приходит через 60 s. Если операция не retry-safe — partial state.

## Три типа in-flight операций

`R-SHUT-IDEM-1` разбивает защиту по контексту: исходящий HTTP, Kafka-handler, outbox-relay.

### 1. Исходящий HTTP POST — Idempotency-Key в адаптере

Ключ генерируется **до** всех retry и передаётся адаптером наружу. В Go retry-логика часто встроена в transport-обёртку или реализуется вручную — в обоих случаях ключ фиксируется раз на бизнес-операцию.

```go
// internal/adapters/out/payment/client.go

type ChargeCommand struct {
    IdempotencyKey string
    OrderID        string
    CustomerID     string
    AmountKopecks  int64
}

func (c *Client) Charge(ctx context.Context, cmd ChargeCommand) error {
    body, err := json.Marshal(map[string]any{
        "order_id":    cmd.OrderID,
        "customer_id": cmd.CustomerID,
        "amount":      cmd.AmountKopecks,
    })
    if err != nil {
        return fmt.Errorf("marshal charge request: %w", err)
    }

    req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/charges", bytes.NewReader(body))
    if err != nil {
        return fmt.Errorf("new charge request: %w", err)
    }
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("Idempotency-Key", cmd.IdempotencyKey) // R-SHUT-IDEM-1

    resp, err := c.http.Do(req)
    if err != nil {
        return fmt.Errorf("charge request: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
        return fmt.Errorf("charge failed: status %d", resp.StatusCode)
    }
    return nil
}
```

Генерация ключа — на уровне use case, один раз:

```go
// internal/usecase/confirm_order.go

func (uc *ConfirmOrderUseCase) Execute(ctx context.Context, cmd ConfirmOrderCommand) error {
    order, err := uc.orders.FindByID(ctx, cmd.OrderID)
    if err != nil {
        return fmt.Errorf("find order: %w", err)
    }

    idempotencyKey := order.ID + ":charge:" + cmd.RequestID // детерминированный, не uuid.New()

    return uc.payment.Charge(ctx, payment.ChargeCommand{
        IdempotencyKey: idempotencyKey,
        OrderID:        order.ID,
        CustomerID:     order.CustomerID,
        AmountKopecks:  order.TotalKopecks,
    })
}
```

Детерминированный ключ (`orderID + ":" + requestID`) — идемпотентен при повторной обработке того же события. `uuid.New()` внутри retry создаёт уникальный ключ на каждый attempt — дубль.

### 2. Kafka-handler — processed_event в одной pgx-транзакции

`kafka-go` вызывает `CommitMessages` явно после обработки. При SIGTERM в окне между обработкой и коммитом offset consumer перечитает сообщение. `processed_event` в той же транзакции что side-effect — атомарная защита: либо оба закоммичены, либо оба откатились.

```go
// internal/consumer/order_consumer.go

func (c *OrderConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var event OrderConfirmedEvent
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return fmt.Errorf("unmarshal order confirmed: %w", err)
    }

    tx, err := c.pool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    q := c.queries.WithTx(tx)

    inserted, err := q.InsertProcessedEvent(ctx, db.InsertProcessedEventParams{
        EventID:    event.ID,
        ConsumerID: "billing-service",
    })
    if err != nil {
        return fmt.Errorf("insert processed event: %w", err)
    }
    if !inserted {
        return nil // дубль — уже обработали
    }

    if err := c.billing.Charge(ctx, payment.ChargeCommand{
        IdempotencyKey: event.ID + ":billing",
        OrderID:        event.OrderID,
        CustomerID:     event.CustomerID,
        AmountKopecks:  event.TotalKopecks,
    }); err != nil {
        return fmt.Errorf("charge order %s: %w", event.OrderID, err)
    }

    return tx.Commit(ctx)
}
```

SQL-запрос для дедупликации:

```sql
-- query.sql (sqlc)
-- name: InsertProcessedEvent :one
INSERT INTO processed_event (event_id, consumer_id, processed_at)
VALUES (@event_id, @consumer_id, now())
ON CONFLICT (event_id, consumer_id) DO NOTHING
RETURNING (xmax = 0) AS inserted;
```

`xmax = 0` возвращает `true` только когда строка была реально вставлена, `false` при конфликте. Так handler узнаёт без отдельного SELECT, нужна ли обработка.

### 3. Outbox-relay — двух-фаза или receiver-side dedup

Relay публикует события из таблицы outbox в Kafka. Если Kafka-send прошёл, а UPDATE `published_at` нет — следующий запуск relay пошлёт то же событие повторно.

**Вариант A: receiver-side dedup (проще)**

Если все downstream consumer'ы реализуют processed_event (как выше) — relay может публиковать дубли, consumer'ы их поглотят. Relay проще — нет состояний publishing:

```go
// internal/scheduler/outbox_relay.go

func (r *OutboxRelay) processOneBatch(ctx context.Context) error {
    tx, err := r.pool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    q := r.queries.WithTx(tx)

    events, err := q.LockPendingOutboxBatch(ctx, r.batchSize) // FOR UPDATE SKIP LOCKED
    if err != nil {
        return fmt.Errorf("lock outbox batch: %w", err)
    }

    for _, e := range events {
        if err := r.writer.WriteMessages(ctx, kafka.Message{
            Key:   []byte(e.AggregateID),
            Value: e.Payload,
            Headers: []kafka.Header{
                {Key: "event-id", Value: []byte(e.ID)},
            },
        }); err != nil {
            return fmt.Errorf("write outbox event %s: %w", e.ID, err)
        }

        if err := q.MarkOutboxDispatched(ctx, e.ID); err != nil {
            return fmt.Errorf("mark dispatched %s: %w", e.ID, err)
        }
    }

    return tx.Commit(ctx)
}
```

**Вариант B: двух-фаза (когда downstream dedup недоступен)**

```sql
-- Статусы: PENDING → PUBLISHING → PUBLISHED
ALTER TABLE outbox_event ADD COLUMN status text NOT NULL DEFAULT 'PENDING';

-- name: LockAndMarkPublishing :many
UPDATE outbox_event
SET status = 'PUBLISHING', locked_at = now()
WHERE id IN (
    SELECT id FROM outbox_event
    WHERE status = 'PENDING'
    ORDER BY created_at
    LIMIT @batch_size
    FOR UPDATE SKIP LOCKED
)
RETURNING *;

-- name: MarkPublished :exec
UPDATE outbox_event SET status = 'PUBLISHED', published_at = now() WHERE id = @id;
```

При SIGTERM между send и MarkPublished строки остаются в `PUBLISHING`. Cleanup-горутина через настраиваемый TTL (например, 5 минут) возвращает зависшие строки в `PENDING`:

```go
// internal/scheduler/outbox_cleanup.go

func (c *OutboxCleanup) Run(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    ticker := time.NewTicker(c.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := c.queries.ResetStuckPublishing(ctx, db.ResetStuckPublishingParams{
                StuckAfter: pgtype.Interval{Microseconds: int64(c.stuckAfter / time.Microsecond), Valid: true},
            }); err != nil {
                slog.WarnContext(ctx, "outbox cleanup failed", "error", err)
            }
        }
    }
}
```

```sql
-- name: ResetStuckPublishing :exec
UPDATE outbox_event
SET status = 'PENDING', locked_at = NULL
WHERE status = 'PUBLISHING'
  AND locked_at < now() - @stuck_after::interval;
```

## Граничный случай: транзакция outbox на context.Background()

Relay-горутина управляется отменяемым контекстом. При SIGTERM контекст отменяется — `processOneBatch` должна завершить **текущий** batch, не начинать новый. Критичная секция использует `context.Background()` для pgx-транзакции, чтобы SIGTERM не прервал commit:

```go
func (r *OutboxRelay) Run(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    ticker := time.NewTicker(r.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return // не начинаем новый batch
        case <-ticker.C:
        }

        // processOneBatch получает context.Background() для tx — не прерываем commit
        batchCtx, cancel := context.WithTimeout(context.Background(), r.batchTimeout)
        if err := r.processOneBatch(batchCtx); err != nil {
            slog.WarnContext(ctx, "outbox relay batch", "error", err)
        }
        cancel()
    }
}
```

`ctx.Done()` — сигнал «не начинать новый batch». `context.Background()` для самого batch — чтобы pgx-транзакция не получила cancel в середине commit.

## Что запрещено

| Антипаттерн | Правило | Что взамен |
|---|---|---|
| `uuid.New()` внутри retry-цикла как Idempotency-Key | `R-SHUT-IDEM-X1` | детерминированный ключ на бизнес-операцию: `orderID + ":" + requestID` |
| Money через `http.Client` без `Idempotency-Key` заголовка | `R-SHUT-IDEM-X1` | `req.Header.Set("Idempotency-Key", cmd.IdempotencyKey)` в адаптере |
| `InsertProcessedEvent` отдельным запросом вне pgx-транзакции с side-effect | `R-SHUT-IDEM-1` | один `tx`, оба действия внутри |
| `CommitMessages` до завершения side-effect в Kafka-handler | `R-SHUT-IDEM-1` | `CommitMessages` после `tx.Commit` |
| Outbox-relay `processOneBatch` с отменяемым контекстом для pgx-транзакции | `R-SHUT-IDEM-1` | `context.Background()` + отдельный timeout для критичной секции |
| `for { processOneBatch(ctx) }` без проверки `ctx.Done()` между итерациями | `R-SHUT-SCHED-3` | `select { case <-ctx.Done(): return; case <-ticker.C: }` |
| Kafka-handler без processed_event при возможном replay offset | `R-SHUT-IDEM-1` | `ON CONFLICT (event_id, consumer_id) DO NOTHING` в одной транзакции |

## Куда дальше

- [Бюджеты и observability](go/budgets-and-observability.md) — метрика `app_shutdown_duration_seconds`, cumulative-бюджет для Go-стека.
- [БД и persistence](go/db-and-persistence.md) — `pgxpool.Pool.Close()` последним, порядок shutdown-последовательности.
- [HTTP drain](go/http-drain.md) — `http.Server.Shutdown` vs `Close`, preStop hook, 202+polling для долгих эндпоинтов.
- [Конфигурация shutdown](go/runtime-server-config.md) — `os.Signal` канал, `context.WithTimeout`, `health.State` с `atomic.Bool`.
- [Kafka shutdown](go/kafka-shutdown.md) — `kafka-go` consumer через context cancellation, `writer.Close()`.
- [Kubernetes](go/kubernetes.md) — `terminationGracePeriodSeconds: 60`, preStop, probes.
- [Фоновые задачи и outbox](go/scheduled-async-outbox.md) — `sync.WaitGroup`, outbox-relay цикл, cleanup-горутина.
