Опирается на правила:
R-SHUT-IDEM-1,R-SHUT-IDEM-X1из контракта Graceful Shutdown → раздел 7. Идемпотентность in-flight.
Важно знать
- Операции, которые SIGTERM может прервать, обязаны быть retry-safe — graceful shutdown даёт время, но не гарантирует отсутствие partial (
R-SHUT-IDEM-1).- В Go нет фреймворкового
@Transactional; транзакция передаётся явно черезcontext.Contextили параметром — ответственность за атомарность processed_event + side-effect лежит на вас.- HTTP-адаптер выходящего вызова обязан ставить
Idempotency-Keyзаголовок один раз на бизнес-операцию — до всех retry; новый UUID на каждый retry создаёт дубль.kafka-goне коммитит offset автоматически;CommitMessagesвызывается явно — это защита от потери, но не от дублей при SIGTERM в окне обработки.- processed_event через
INSERT ... ON CONFLICT (event_id) DO NOTHINGв одной pgx-транзакции с side-effect — достаточная защита для Kafka-handler.- Outbox-relay: Kafka-send прошёл, UPDATE
published_atне дошёл → следующий relay пошлёт снова; защита — receiver-side dedup или двух-фазаPENDING → PUBLISHING → PUBLISHED.- Money-cascade через
http.Clientс retry безIdempotency-Key— SIGTERM в момент первого запроса, новый pod шлёт повторно, provider обрабатывает оба (R-SHUT-IDEM-X1).- Идемпотентность — последняя линия защиты; graceful shutdown (
http.Server.Shutdown,WaitGroup, context cancellation) — первая.
Graceful shutdown в Go управляется явно: os.Signal канал, context.WithCancel, http.Server.Shutdown(ctx), sync.WaitGroup для фоновых горутин. http.Server.Shutdown дожидается in-flight HTTP-запросов — большинство операций успевает завершиться в штатный таймаут 25–30 s. Но долгий cascade (HTTP с retry × 3 × 10 s) может не уложиться, а SIGKILL приходит через 60 s. Если операция не retry-safe — partial state.
Три типа in-flight операций
R-SHUT-IDEM-1 разбивает защиту по контексту: исходящий HTTP, Kafka-handler, outbox-relay.
1. Исходящий HTTP POST — Idempotency-Key в адаптере
Ключ генерируется до всех retry и передаётся адаптером наружу. В Go retry-логика часто встроена в transport-обёртку или реализуется вручную — в обоих случаях ключ фиксируется раз на бизнес-операцию.
// internal/adapters/out/payment/client.go
type ChargeCommand struct {
IdempotencyKey string
OrderID string
CustomerID string
AmountKopecks int64
}
func (c *Client) Charge(ctx context.Context, cmd ChargeCommand) error {
body, err := json.Marshal(map[string]any{
"order_id": cmd.OrderID,
"customer_id": cmd.CustomerID,
"amount": cmd.AmountKopecks,
})
if err != nil {
return fmt.Errorf("marshal charge request: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/charges", bytes.NewReader(body))
if err != nil {
return fmt.Errorf("new charge request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Idempotency-Key", cmd.IdempotencyKey) // R-SHUT-IDEM-1
resp, err := c.http.Do(req)
if err != nil {
return fmt.Errorf("charge request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("charge failed: status %d", resp.StatusCode)
}
return nil
}
Генерация ключа — на уровне use case, один раз:
// internal/usecase/confirm_order.go
func (uc *ConfirmOrderUseCase) Execute(ctx context.Context, cmd ConfirmOrderCommand) error {
order, err := uc.orders.FindByID(ctx, cmd.OrderID)
if err != nil {
return fmt.Errorf("find order: %w", err)
}
idempotencyKey := order.ID + ":charge:" + cmd.RequestID // детерминированный, не uuid.New()
return uc.payment.Charge(ctx, payment.ChargeCommand{
IdempotencyKey: idempotencyKey,
OrderID: order.ID,
CustomerID: order.CustomerID,
AmountKopecks: order.TotalKopecks,
})
}
Детерминированный ключ (orderID + ":" + requestID) — идемпотентен при повторной обработке того же события. uuid.New() внутри retry создаёт уникальный ключ на каждый attempt — дубль.
2. Kafka-handler — processed_event в одной pgx-транзакции
kafka-go вызывает CommitMessages явно после обработки. При SIGTERM в окне между обработкой и коммитом offset consumer перечитает сообщение. processed_event в той же транзакции что side-effect — атомарная защита: либо оба закоммичены, либо оба откатились.
// internal/consumer/order_consumer.go
func (c *OrderConsumer) handle(ctx context.Context, msg kafka.Message) error {
var event OrderConfirmedEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
return fmt.Errorf("unmarshal order confirmed: %w", err)
}
tx, err := c.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
q := c.queries.WithTx(tx)
inserted, err := q.InsertProcessedEvent(ctx, db.InsertProcessedEventParams{
EventID: event.ID,
ConsumerID: "billing-service",
})
if err != nil {
return fmt.Errorf("insert processed event: %w", err)
}
if !inserted {
return nil // дубль — уже обработали
}
if err := c.billing.Charge(ctx, payment.ChargeCommand{
IdempotencyKey: event.ID + ":billing",
OrderID: event.OrderID,
CustomerID: event.CustomerID,
AmountKopecks: event.TotalKopecks,
}); err != nil {
return fmt.Errorf("charge order %s: %w", event.OrderID, err)
}
return tx.Commit(ctx)
}
SQL-запрос для дедупликации:
-- query.sql (sqlc)
-- name: InsertProcessedEvent :one
INSERT INTO processed_event (event_id, consumer_id, processed_at)
VALUES (@event_id, @consumer_id, now())
ON CONFLICT (event_id, consumer_id) DO NOTHING
RETURNING (xmax = 0) AS inserted;
xmax = 0 возвращает true только когда строка была реально вставлена, false при конфликте. Так handler узнаёт без отдельного SELECT, нужна ли обработка.
3. Outbox-relay — двух-фаза или receiver-side dedup
Relay публикует события из таблицы outbox в Kafka. Если Kafka-send прошёл, а UPDATE published_at нет — следующий запуск relay пошлёт то же событие повторно.
Вариант A: receiver-side dedup (проще)
Если все downstream consumer'ы реализуют processed_event (как выше) — relay может публиковать дубли, consumer'ы их поглотят. Relay проще — нет состояний publishing:
// internal/scheduler/outbox_relay.go
func (r *OutboxRelay) processOneBatch(ctx context.Context) error {
tx, err := r.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
q := r.queries.WithTx(tx)
events, err := q.LockPendingOutboxBatch(ctx, r.batchSize) // FOR UPDATE SKIP LOCKED
if err != nil {
return fmt.Errorf("lock outbox batch: %w", err)
}
for _, e := range events {
if err := r.writer.WriteMessages(ctx, kafka.Message{
Key: []byte(e.AggregateID),
Value: e.Payload,
Headers: []kafka.Header{
{Key: "event-id", Value: []byte(e.ID)},
},
}); err != nil {
return fmt.Errorf("write outbox event %s: %w", e.ID, err)
}
if err := q.MarkOutboxDispatched(ctx, e.ID); err != nil {
return fmt.Errorf("mark dispatched %s: %w", e.ID, err)
}
}
return tx.Commit(ctx)
}
Вариант B: двух-фаза (когда downstream dedup недоступен)
-- Статусы: PENDING → PUBLISHING → PUBLISHED
ALTER TABLE outbox_event ADD COLUMN status text NOT NULL DEFAULT 'PENDING';
-- name: LockAndMarkPublishing :many
UPDATE outbox_event
SET status = 'PUBLISHING', locked_at = now()
WHERE id IN (
SELECT id FROM outbox_event
WHERE status = 'PENDING'
ORDER BY created_at
LIMIT @batch_size
FOR UPDATE SKIP LOCKED
)
RETURNING *;
-- name: MarkPublished :exec
UPDATE outbox_event SET status = 'PUBLISHED', published_at = now() WHERE id = @id;
При SIGTERM между send и MarkPublished строки остаются в PUBLISHING. Cleanup-горутина через настраиваемый TTL (например, 5 минут) возвращает зависшие строки в PENDING:
// internal/scheduler/outbox_cleanup.go
func (c *OutboxCleanup) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := c.queries.ResetStuckPublishing(ctx, db.ResetStuckPublishingParams{
StuckAfter: pgtype.Interval{Microseconds: int64(c.stuckAfter / time.Microsecond), Valid: true},
}); err != nil {
slog.WarnContext(ctx, "outbox cleanup failed", "error", err)
}
}
}
}
-- name: ResetStuckPublishing :exec
UPDATE outbox_event
SET status = 'PENDING', locked_at = NULL
WHERE status = 'PUBLISHING'
AND locked_at < now() - @stuck_after::interval;
Граничный случай: транзакция outbox на context.Background()
Relay-горутина управляется отменяемым контекстом. При SIGTERM контекст отменяется — processOneBatch должна завершить текущий batch, не начинать новый. Критичная секция использует context.Background() для pgx-транзакции, чтобы SIGTERM не прервал commit:
func (r *OutboxRelay) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return // не начинаем новый batch
case <-ticker.C:
}
// processOneBatch получает context.Background() для tx — не прерываем commit
batchCtx, cancel := context.WithTimeout(context.Background(), r.batchTimeout)
if err := r.processOneBatch(batchCtx); err != nil {
slog.WarnContext(ctx, "outbox relay batch", "error", err)
}
cancel()
}
}
ctx.Done() — сигнал «не начинать новый batch». context.Background() для самого batch — чтобы pgx-транзакция не получила cancel в середине commit.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
uuid.New() внутри retry-цикла как Idempotency-Key | R-SHUT-IDEM-X1 | детерминированный ключ на бизнес-операцию: orderID + ":" + requestID |
Money через http.Client без Idempotency-Key заголовка | R-SHUT-IDEM-X1 | req.Header.Set("Idempotency-Key", cmd.IdempotencyKey) в адаптере |
InsertProcessedEvent отдельным запросом вне pgx-транзакции с side-effect | R-SHUT-IDEM-1 | один tx, оба действия внутри |
CommitMessages до завершения side-effect в Kafka-handler | R-SHUT-IDEM-1 | CommitMessages после tx.Commit |
Outbox-relay processOneBatch с отменяемым контекстом для pgx-транзакции | R-SHUT-IDEM-1 | context.Background() + отдельный timeout для критичной секции |
for { processOneBatch(ctx) } без проверки ctx.Done() между итерациями | R-SHUT-SCHED-3 | select { case <-ctx.Done(): return; case <-ticker.C: } |
| Kafka-handler без processed_event при возможном replay offset | R-SHUT-IDEM-1 | ON CONFLICT (event_id, consumer_id) DO NOTHING в одной транзакции |
Куда дальше
- Бюджеты и observability — метрика
app_shutdown_duration_seconds, cumulative-бюджет для Go-стека. - БД и persistence —
pgxpool.Pool.Close()последним, порядок shutdown-последовательности. - HTTP drain —
http.Server.ShutdownvsClose, preStop hook, 202+polling для долгих эндпоинтов. - Конфигурация shutdown —
os.Signalканал,context.WithTimeout,health.Stateсatomic.Bool. - Kafka shutdown —
kafka-goconsumer через context cancellation,writer.Close(). - Kubernetes —
terminationGracePeriodSeconds: 60, preStop, probes. - Фоновые задачи и outbox —
sync.WaitGroup, outbox-relay цикл, cleanup-горутина.