Когда сервис получает SIGTERM, он начинает корректное завершение: перестаёт принимать новые запросы, дожидается текущих и останавливается. Большинство операций успевает завершиться в отведённые 25–30 секунд. Но если операция не уложилась — её можно повторить только при одном условии: повторный запрос не должен создавать дубль.
Это и есть идемпотентность — свойство операции, при котором выполнить её один раз или несколько раз одинаково безопасно.
Почему одного graceful shutdown недостаточно
Представьте: сервис обрабатывает заказ и посылает запрос на списание денег. В этот момент приходит SIGTERM. http.Server.Shutdown в Go дожидается, пока HTTP-соединения завершатся, но у него есть таймаут — обычно 25–30 секунд. Если за это время списание не завершилось, сервис останавливается принудительно.
Новый экземпляр сервиса поднимается и обрабатывает тот же заказ заново. Если запрос на списание не идемпотентен — деньги спишутся дважды.
Graceful shutdown даёт время, но не гарантирует завершённость. Идемпотентность — страховка на тот случай, когда времени не хватило.
Исходящий HTTP-запрос: Idempotency-Key
Самый распространённый случай — сервис вызывает внешний платёжный провайдер или другой сервис через HTTP. При повторном запросе сервер должен понять: «этот запрос я уже обработал».
Для этого используется заголовок Idempotency-Key — уникальный ключ, который клиент ставит на запрос. Сервер запоминает ключ и при повторе возвращает прежний результат, не выполняя операцию заново.
Главное правило: ключ генерируется один раз до всех повторных попыток, а не при каждой попытке.
// internal/adapters/out/payment/client.go
type ChargeCommand struct {
IdempotencyKey string
OrderID string
CustomerID string
AmountKopecks int64
}
func (c *Client) Charge(ctx context.Context, cmd ChargeCommand) error {
body, _ := json.Marshal(map[string]any{
"order_id": cmd.OrderID,
"customer_id": cmd.CustomerID,
"amount": cmd.AmountKopecks,
})
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/charges", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Idempotency-Key", cmd.IdempotencyKey) // один ключ на всю операцию
resp, err := c.http.Do(req)
if err != nil {
return fmt.Errorf("charge request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("charge failed: status %d", resp.StatusCode)
}
return nil
}
Ключ формируется на уровне бизнес-логики — детерминированно, из данных самой операции:
// internal/usecase/confirm_order.go
func (uc *ConfirmOrderUseCase) Execute(ctx context.Context, cmd ConfirmOrderCommand) error {
order, err := uc.orders.FindByID(ctx, cmd.OrderID)
if err != nil {
return fmt.Errorf("find order: %w", err)
}
// Детерминированный ключ: один и тот же для одного и того же запроса
idempotencyKey := order.ID + ":charge:" + cmd.RequestID
return uc.payment.Charge(ctx, payment.ChargeCommand{
IdempotencyKey: idempotencyKey,
OrderID: order.ID,
CustomerID: order.CustomerID,
AmountKopecks: order.TotalKopecks,
})
}
Частая ошибка — вызывать uuid.New() при каждой попытке. Тогда каждый повторный запрос уходит с новым ключом, и провайдер считает его отдельной операцией.
Kafka-consumer: дедупликация через processed_event
kafka-go не коммитит offset автоматически — коммит делается явно через CommitMessages. Если сервис получил сообщение из Kafka, обработал его, но не успел закоммитить offset до SIGTERM — следующий запуск consumer прочитает то же сообщение снова.
Стандартное решение: таблица processed_event, запись в которую делается в одной транзакции с основным действием. Если сообщение пришло повторно — запись уже есть, и операцию пропускаем.
// internal/consumer/order_consumer.go
func (c *OrderConsumer) handle(ctx context.Context, msg kafka.Message) error {
var event OrderConfirmedEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
tx, err := c.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
q := c.queries.WithTx(tx)
// Записываем факт обработки — в той же транзакции, что и само действие
inserted, err := q.InsertProcessedEvent(ctx, db.InsertProcessedEventParams{
EventID: event.ID,
ConsumerID: "billing-service",
})
if err != nil {
return fmt.Errorf("insert processed event: %w", err)
}
if !inserted {
return nil // уже обработано — пропускаем
}
if err := c.billing.Charge(ctx, payment.ChargeCommand{
IdempotencyKey: event.ID + ":billing",
OrderID: event.OrderID,
CustomerID: event.CustomerID,
AmountKopecks: event.TotalKopecks,
}); err != nil {
return fmt.Errorf("charge order %s: %w", event.OrderID, err)
}
return tx.Commit(ctx)
}
SQL-запрос для дедупликации требует внимания: нельзя использовать ON CONFLICT DO NOTHING, потому что тогда RETURNING при конфликте вернёт 0 строк, что приведёт к ошибке pgx.ErrNoRows. Правильный вариант — DO UPDATE, который всегда возвращает строку:
-- name: InsertProcessedEvent :one
INSERT INTO processed_event (event_id, consumer_id, processed_at)
VALUES (@event_id, @consumer_id, now())
ON CONFLICT (event_id, consumer_id) DO UPDATE
SET processed_at = EXCLUDED.processed_at
RETURNING (xmax = 0) AS inserted;
При первой вставке xmax = 0 и возвращается inserted = true. При повторе строка обновляется, xmax становится ненулевым и возвращается inserted = false.
Outbox-relay: двухфазная публикация
Outbox — это таблица событий, которые нужно отправить в Kafka. Relay-горутина периодически читает из неё и публикует. Проблема: если сообщение в Kafka отправлено, но статус в таблице не обновлён до SIGTERM — relay отправит его снова.
Есть два подхода.
Если все downstream-consumer реализуют processed_event — relay может публиковать дубли, consumer их отфильтрует. Это проще:
// internal/scheduler/outbox_relay.go
func (r *OutboxRelay) processOneBatch(ctx context.Context) error {
// context.Background() — чтобы SIGTERM не прервал commit уже начатой транзакции.
// Новые batch не начинаются благодаря проверке ctx.Done() в Run.
bgCtx := context.Background()
tx, err := r.pool.Begin(bgCtx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(bgCtx)
q := r.queries.WithTx(tx)
events, err := q.LockPendingOutboxBatch(bgCtx, r.batchSize) // FOR UPDATE SKIP LOCKED
if err != nil {
return fmt.Errorf("lock outbox batch: %w", err)
}
for _, e := range events {
if err := r.writer.WriteMessages(ctx, kafka.Message{
Key: []byte(e.AggregateID),
Value: e.Payload,
Headers: []kafka.Header{
{Key: "event-id", Value: []byte(e.ID)},
},
}); err != nil {
return fmt.Errorf("write outbox event %s: %w", e.ID, err)
}
if err := q.MarkOutboxDispatched(bgCtx, e.ID); err != nil {
return fmt.Errorf("mark dispatched %s: %w", e.ID, err)
}
}
return tx.Commit(bgCtx)
}
Если downstream-consumer не контролируется — нужна явная двухфазная публикация через статусы PENDING → PUBLISHING → PUBLISHED:
-- Перевод в PUBLISHING атомарно (FOR UPDATE SKIP LOCKED)
-- name: LockAndMarkPublishing :many
UPDATE outbox_event
SET status = 'PUBLISHING', locked_at = now()
WHERE id IN (
SELECT id FROM outbox_event
WHERE status = 'PENDING'
ORDER BY created_at
LIMIT @batch_size
FOR UPDATE SKIP LOCKED
)
RETURNING *;
-- Перевод в PUBLISHED после успешной отправки
-- name: MarkPublished :exec
UPDATE outbox_event SET status = 'PUBLISHED', published_at = now() WHERE id = @id;
Если SIGTERM пришёл между отправкой и MarkPublished, строки зависают в PUBLISHING. Cleanup-горутина через настраиваемый TTL возвращает их в PENDING:
func (c *OutboxCleanup) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := c.queries.ResetStuckPublishing(ctx, db.ResetStuckPublishingParams{
StuckAfter: pgtype.Interval{Microseconds: int64(c.stuckAfter / time.Microsecond), Valid: true},
}); err != nil {
slog.WarnContext(ctx, "outbox cleanup failed", "error", err)
}
}
}
}
-- name: ResetStuckPublishing :exec
UPDATE outbox_event
SET status = 'PENDING', locked_at = NULL
WHERE status = 'PUBLISHING'
AND locked_at < now() - @stuck_after::interval;
context.Background() в relay — зачем
Relay-горутина получает отменяемый контекст, который отменяется при SIGTERM. Но если передавать этот же контекст в pgx-транзакцию — SIGTERM отменит транзакцию прямо в момент коммита, и пачка откатится.
Решение: использовать context.Background() для транзакции внутри пачки, а проверку отмены делать между итерациями:
func (r *OutboxRelay) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return // не начинаем новый batch
case <-ticker.C:
}
batchCtx, cancel := context.WithTimeout(context.Background(), r.batchTimeout)
if err := r.processOneBatch(batchCtx); err != nil {
slog.WarnContext(ctx, "outbox relay batch", "error", err)
}
cancel()
}
}
ctx.Done() — сигнал остановить цикл. context.Background() для пачки — чтобы уже начатая транзакция могла закоммититься.
Коротко
- Graceful shutdown даёт время завершить операции, но не гарантирует это. Операции должны быть безопасны для повтора — это и есть идемпотентность.
- Для исходящего HTTP POST: ставить заголовок
Idempotency-Keyв адаптере. Ключ формируется детерминированно один раз на бизнес-операцию, а не генерируется заново при каждой попытке. - Для Kafka-consumer: записывать факт обработки в таблицу
processed_eventв одной pgx-транзакции с основным действием. SQL —ON CONFLICT DO UPDATE ... RETURNING (xmax = 0) AS inserted(неDO NOTHING— при конфликтеDO NOTHINGне возвращает строку). - Для outbox-relay: если downstream не контролируется — двухфазный статус
PENDING → PUBLISHING → PUBLISHEDплюс cleanup-горутина для зависших записей. - В relay-горутине использовать
context.Background()для pgx-транзакции внутри пачки, а проверку отмены делать между итерациями черезselect { case <-ctx.Done(): return }.
Что почитать дальше
- HTTP drain в Go — как
http.Server.Shutdownдожидается запросов и что делать с долгими эндпоинтами. - Kafka shutdown в Go — consumer через context cancellation,
writer.Close(). - Фоновые задачи и outbox в Go —
sync.WaitGroup, outbox-relay цикл, cleanup-горутина. - БД и persistence в Go — порядок shutdown-последовательности,
pgxpool.Pool.Close()последним.