← назад к разделу

Когда сервис получает SIGTERM, он начинает корректное завершение: перестаёт принимать новые запросы, дожидается текущих и останавливается. Большинство операций успевает завершиться в отведённые 25–30 секунд. Но если операция не уложилась — её можно повторить только при одном условии: повторный запрос не должен создавать дубль.

Это и есть идемпотентность — свойство операции, при котором выполнить её один раз или несколько раз одинаково безопасно.

Почему одного graceful shutdown недостаточно

Представьте: сервис обрабатывает заказ и посылает запрос на списание денег. В этот момент приходит SIGTERM. http.Server.Shutdown в Go дожидается, пока HTTP-соединения завершатся, но у него есть таймаут — обычно 25–30 секунд. Если за это время списание не завершилось, сервис останавливается принудительно.

Новый экземпляр сервиса поднимается и обрабатывает тот же заказ заново. Если запрос на списание не идемпотентен — деньги спишутся дважды.

Graceful shutdown даёт время, но не гарантирует завершённость. Идемпотентность — страховка на тот случай, когда времени не хватило.

Исходящий HTTP-запрос: Idempotency-Key

Самый распространённый случай — сервис вызывает внешний платёжный провайдер или другой сервис через HTTP. При повторном запросе сервер должен понять: «этот запрос я уже обработал».

Для этого используется заголовок Idempotency-Key — уникальный ключ, который клиент ставит на запрос. Сервер запоминает ключ и при повторе возвращает прежний результат, не выполняя операцию заново.

Главное правило: ключ генерируется один раз до всех повторных попыток, а не при каждой попытке.

// internal/adapters/out/payment/client.go

type ChargeCommand struct {
    IdempotencyKey string
    OrderID        string
    CustomerID     string
    AmountKopecks  int64
}

func (c *Client) Charge(ctx context.Context, cmd ChargeCommand) error {
    body, _ := json.Marshal(map[string]any{
        "order_id":    cmd.OrderID,
        "customer_id": cmd.CustomerID,
        "amount":      cmd.AmountKopecks,
    })

    req, _ := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/charges", bytes.NewReader(body))
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("Idempotency-Key", cmd.IdempotencyKey) // один ключ на всю операцию

    resp, err := c.http.Do(req)
    if err != nil {
        return fmt.Errorf("charge request: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
        return fmt.Errorf("charge failed: status %d", resp.StatusCode)
    }
    return nil
}

Ключ формируется на уровне бизнес-логики — детерминированно, из данных самой операции:

// internal/usecase/confirm_order.go

func (uc *ConfirmOrderUseCase) Execute(ctx context.Context, cmd ConfirmOrderCommand) error {
    order, err := uc.orders.FindByID(ctx, cmd.OrderID)
    if err != nil {
        return fmt.Errorf("find order: %w", err)
    }

    // Детерминированный ключ: один и тот же для одного и того же запроса
    idempotencyKey := order.ID + ":charge:" + cmd.RequestID

    return uc.payment.Charge(ctx, payment.ChargeCommand{
        IdempotencyKey: idempotencyKey,
        OrderID:        order.ID,
        CustomerID:     order.CustomerID,
        AmountKopecks:  order.TotalKopecks,
    })
}

Частая ошибка — вызывать uuid.New() при каждой попытке. Тогда каждый повторный запрос уходит с новым ключом, и провайдер считает его отдельной операцией.

Kafka-consumer: дедупликация через processed_event

kafka-go не коммитит offset автоматически — коммит делается явно через CommitMessages. Если сервис получил сообщение из Kafka, обработал его, но не успел закоммитить offset до SIGTERM — следующий запуск consumer прочитает то же сообщение снова.

Стандартное решение: таблица processed_event, запись в которую делается в одной транзакции с основным действием. Если сообщение пришло повторно — запись уже есть, и операцию пропускаем.

// internal/consumer/order_consumer.go

func (c *OrderConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var event OrderConfirmedEvent
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return fmt.Errorf("unmarshal: %w", err)
    }

    tx, err := c.pool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    q := c.queries.WithTx(tx)

    // Записываем факт обработки — в той же транзакции, что и само действие
    inserted, err := q.InsertProcessedEvent(ctx, db.InsertProcessedEventParams{
        EventID:    event.ID,
        ConsumerID: "billing-service",
    })
    if err != nil {
        return fmt.Errorf("insert processed event: %w", err)
    }
    if !inserted {
        return nil // уже обработано — пропускаем
    }

    if err := c.billing.Charge(ctx, payment.ChargeCommand{
        IdempotencyKey: event.ID + ":billing",
        OrderID:        event.OrderID,
        CustomerID:     event.CustomerID,
        AmountKopecks:  event.TotalKopecks,
    }); err != nil {
        return fmt.Errorf("charge order %s: %w", event.OrderID, err)
    }

    return tx.Commit(ctx)
}

SQL-запрос для дедупликации требует внимания: нельзя использовать ON CONFLICT DO NOTHING, потому что тогда RETURNING при конфликте вернёт 0 строк, что приведёт к ошибке pgx.ErrNoRows. Правильный вариант — DO UPDATE, который всегда возвращает строку:

-- name: InsertProcessedEvent :one
INSERT INTO processed_event (event_id, consumer_id, processed_at)
VALUES (@event_id, @consumer_id, now())
ON CONFLICT (event_id, consumer_id) DO UPDATE
    SET processed_at = EXCLUDED.processed_at
RETURNING (xmax = 0) AS inserted;

При первой вставке xmax = 0 и возвращается inserted = true. При повторе строка обновляется, xmax становится ненулевым и возвращается inserted = false.

Outbox-relay: двухфазная публикация

Outbox — это таблица событий, которые нужно отправить в Kafka. Relay-горутина периодически читает из неё и публикует. Проблема: если сообщение в Kafka отправлено, но статус в таблице не обновлён до SIGTERM — relay отправит его снова.

Есть два подхода.

Если все downstream-consumer реализуют processed_event — relay может публиковать дубли, consumer их отфильтрует. Это проще:

// internal/scheduler/outbox_relay.go

func (r *OutboxRelay) processOneBatch(ctx context.Context) error {
    // context.Background() — чтобы SIGTERM не прервал commit уже начатой транзакции.
    // Новые batch не начинаются благодаря проверке ctx.Done() в Run.
    bgCtx := context.Background()

    tx, err := r.pool.Begin(bgCtx)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(bgCtx)

    q := r.queries.WithTx(tx)
    events, err := q.LockPendingOutboxBatch(bgCtx, r.batchSize) // FOR UPDATE SKIP LOCKED
    if err != nil {
        return fmt.Errorf("lock outbox batch: %w", err)
    }

    for _, e := range events {
        if err := r.writer.WriteMessages(ctx, kafka.Message{
            Key:   []byte(e.AggregateID),
            Value: e.Payload,
            Headers: []kafka.Header{
                {Key: "event-id", Value: []byte(e.ID)},
            },
        }); err != nil {
            return fmt.Errorf("write outbox event %s: %w", e.ID, err)
        }

        if err := q.MarkOutboxDispatched(bgCtx, e.ID); err != nil {
            return fmt.Errorf("mark dispatched %s: %w", e.ID, err)
        }
    }

    return tx.Commit(bgCtx)
}

Если downstream-consumer не контролируется — нужна явная двухфазная публикация через статусы PENDING → PUBLISHING → PUBLISHED:

-- Перевод в PUBLISHING атомарно (FOR UPDATE SKIP LOCKED)
-- name: LockAndMarkPublishing :many
UPDATE outbox_event
SET status = 'PUBLISHING', locked_at = now()
WHERE id IN (
    SELECT id FROM outbox_event
    WHERE status = 'PENDING'
    ORDER BY created_at
    LIMIT @batch_size
    FOR UPDATE SKIP LOCKED
)
RETURNING *;

-- Перевод в PUBLISHED после успешной отправки
-- name: MarkPublished :exec
UPDATE outbox_event SET status = 'PUBLISHED', published_at = now() WHERE id = @id;

Если SIGTERM пришёл между отправкой и MarkPublished, строки зависают в PUBLISHING. Cleanup-горутина через настраиваемый TTL возвращает их в PENDING:

func (c *OutboxCleanup) Run(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    ticker := time.NewTicker(c.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := c.queries.ResetStuckPublishing(ctx, db.ResetStuckPublishingParams{
                StuckAfter: pgtype.Interval{Microseconds: int64(c.stuckAfter / time.Microsecond), Valid: true},
            }); err != nil {
                slog.WarnContext(ctx, "outbox cleanup failed", "error", err)
            }
        }
    }
}
-- name: ResetStuckPublishing :exec
UPDATE outbox_event
SET status = 'PENDING', locked_at = NULL
WHERE status = 'PUBLISHING'
  AND locked_at < now() - @stuck_after::interval;

context.Background() в relay — зачем

Relay-горутина получает отменяемый контекст, который отменяется при SIGTERM. Но если передавать этот же контекст в pgx-транзакцию — SIGTERM отменит транзакцию прямо в момент коммита, и пачка откатится.

Решение: использовать context.Background() для транзакции внутри пачки, а проверку отмены делать между итерациями:

func (r *OutboxRelay) Run(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    ticker := time.NewTicker(r.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return // не начинаем новый batch
        case <-ticker.C:
        }

        batchCtx, cancel := context.WithTimeout(context.Background(), r.batchTimeout)
        if err := r.processOneBatch(batchCtx); err != nil {
            slog.WarnContext(ctx, "outbox relay batch", "error", err)
        }
        cancel()
    }
}

ctx.Done() — сигнал остановить цикл. context.Background() для пачки — чтобы уже начатая транзакция могла закоммититься.

Коротко

  • Graceful shutdown даёт время завершить операции, но не гарантирует это. Операции должны быть безопасны для повтора — это и есть идемпотентность.
  • Для исходящего HTTP POST: ставить заголовок Idempotency-Key в адаптере. Ключ формируется детерминированно один раз на бизнес-операцию, а не генерируется заново при каждой попытке.
  • Для Kafka-consumer: записывать факт обработки в таблицу processed_event в одной pgx-транзакции с основным действием. SQL — ON CONFLICT DO UPDATE ... RETURNING (xmax = 0) AS inserted (не DO NOTHING — при конфликте DO NOTHING не возвращает строку).
  • Для outbox-relay: если downstream не контролируется — двухфазный статус PENDING → PUBLISHING → PUBLISHED плюс cleanup-горутина для зависших записей.
  • В relay-горутине использовать context.Background() для pgx-транзакции внутри пачки, а проверку отмены делать между итерациями через select { case <-ctx.Done(): return }.

Что почитать дальше

  • HTTP drain в Go — как http.Server.Shutdown дожидается запросов и что делать с долгими эндпоинтами.
  • Kafka shutdown в Go — consumer через context cancellation, writer.Close().
  • Фоновые задачи и outbox в Go — sync.WaitGroup, outbox-relay цикл, cleanup-горутина.
  • БД и persistence в Go — порядок shutdown-последовательности, pgxpool.Pool.Close() последним.