Опирается на правила:
R-DIST-IDEM-1…R-DIST-IDEM-5иR-DIST-IDEM-X1…R-DIST-IDEM-X3из Distributed Patterns Style Guide → раздел 3. Idempotency.
Важно знать
- Распределённая система всегда at-least-once — retry, rebalance, network timeout приводят к дублям. Receiver обязан быть идемпотентным.
- Каждое cross-service сообщение имеет уникальный ID: Kafka —
event_idUUID v4/v7 в заголовке, HTTP money —Idempotency-Keyheader, шаг саги —saga_id + step.- Проверка и запись
processed_event— в однойpgx.Txс бизнес-обновлением. Проверка вне транзакции — race condition.- HTTP-команды — middleware хранит
(idempotency_key, command_hash, status_code, response_body): повтор возвращает сохранённый ответ, конфликт ключа с другим телом →409 Conflict.- Money — двойная защита:
Idempotency-Keyот клиента + UNIQUE(provider_id, external_payment_id)в БД.- TTL 24–72 часа для idempotency-records: короче — реальный retry не проходит; длиннее — таблица растёт, autovacuum нагружен.
- Producer тоже exactly-once: kafka-go
RequiredAcks: -1+Balancer: &kafka.Hash{}. Receiver-side dedup в одиночку недостаточен.- В Go нет аннотационной магии (
@Transactional): транзакция — явныйpgx.Tx, пробрасываемый черезqueries.WithTx(tx).
В распределённой системе нет гарантии «ровно один раз». Сеть теряет ACK, Kafka rebalance отдаёт то же сообщение новому consumer-у, HTTP retry после таймаута попадает на уже выполненную команду. Единственный рабочий способ — receiver проверяет, не обработал ли он уже это сообщение, и при повторе возвращает тот же результат.
Уникальный ID на каждое сообщение
R-DIST-IDEM-1: каждое cross-service сообщение несёт уникальный ID.
| Транспорт | ID | Источник |
|---|---|---|
| Kafka event | event_id в заголовке, UUID v4/v7 | producer генерирует |
| HTTP money command | Idempotency-Key header | клиент генерирует |
| Шаг саги | saga_id + step_name | orchestrator |
UUID v7 монотонно возрастает (timestamp в старших битах) — последовательные вставки в processed_event не фрагментируют B-tree индекс PG.
Kafka-producer выставляет event_id в заголовок при отправке:
// internal/outbox/relay.go
kafka.Message{
Topic: m.Topic,
Key: []byte(m.Key),
Value: m.Payload,
Headers: []kafka.Header{
{Key: "event_id", Value: []byte(m.MessageID.String())},
{Key: "saga_id", Value: []byte(m.SagaID.String())},
},
}
Processed-events для Kafka consumer
R-DIST-IDEM-2: receiver хранит event_id обработанных сообщений в таблице processed_event и проверяет перед обработкой в той же транзакции. Kafka rebalance отдаёт сообщение другому consumer-у — без dedup команда выполнится дважды.
CREATE TABLE processed_event (
event_id uuid PRIMARY KEY,
topic text NOT NULL,
consumer_name text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
expires_at timestamptz NOT NULL
);
CREATE INDEX ix_processed_event_expires_at ON processed_event (expires_at);
sqlc-запрос проверяет существование и вставляет атомарно:
-- query.sql
-- name: ExistsProcessedEvent :one
SELECT EXISTS (
SELECT 1 FROM processed_event WHERE event_id = $1
);
-- name: InsertProcessedEvent :exec
INSERT INTO processed_event (event_id, topic, consumer_name, expires_at)
VALUES ($1, $2, $3, $4);
Consumer проверяет и вставляет в одной транзакции:
// adapters/in/kafka/order_consumer.go
func headerVal(headers []kafka.Header, key string) string {
for _, h := range headers {
if h.Key == key {
return string(h.Value)
}
}
return ""
}
func (c *OrderConsumer) handleMessage(ctx context.Context, tx pgx.Tx, msg kafka.Message) error {
eventID := headerVal(msg.Headers, "event_id")
qtx := c.queries.WithTx(tx)
exists, err := qtx.ExistsProcessedEvent(ctx, eventID)
if err != nil {
return fmt.Errorf("check processed event: %w", err)
}
if exists {
return nil
}
if err := c.processOrderEvent(ctx, qtx, msg); err != nil {
return err
}
return qtx.InsertProcessedEvent(ctx, db.InsertProcessedEventParams{
EventID: eventID,
Topic: msg.Topic,
ConsumerName: "order-service",
ExpiresAt: time.Now().Add(72 * time.Hour),
})
}
Вся логика — ExistsProcessedEvent → processOrderEvent → InsertProcessedEvent — в одной pgx.Tx. Если бизнес-операция упала после проверки но до записи processed_event — транзакция откатится целиком, следующий retry обработает корректно.
Idempotency middleware для HTTP-команд
R-DIST-IDEM-3: HTTP-команды (особенно money) хранят (idempotency_key, command_hash, status_code, response_body). Повтор с тем же ключом и тем же телом → сохранённый response. Тот же ключ, другое тело → 409 Conflict.
CREATE TABLE idempotency_record (
idempotency_key text PRIMARY KEY,
command_hash text NOT NULL,
status_code int NOT NULL,
response_body bytea NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
expires_at timestamptz NOT NULL
);
CREATE INDEX ix_idempotency_record_expires_at ON idempotency_record (expires_at);
Middleware перехватывает запрос до handler-а:
// adapters/in/http/middleware/idempotency.go
package middleware
type ConflictingKeyError struct{ Key string }
func (e *ConflictingKeyError) Error() string {
return fmt.Sprintf("idempotency key %q used with different command", e.Key)
}
type IdempotencyMiddleware struct {
queries *db.Queries
pool *pgxpool.Pool
}
func NewIdempotencyMiddleware(queries *db.Queries, pool *pgxpool.Pool) *IdempotencyMiddleware {
return &IdempotencyMiddleware{queries: queries, pool: pool}
}
func (m *IdempotencyMiddleware) Handler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := r.Header.Get("Idempotency-Key")
if key == "" {
next.ServeHTTP(w, r)
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
httperr.Write(w, r, fmt.Errorf("read body: %w", err))
return
}
r.Body = io.NopCloser(bytes.NewReader(body))
commandHash := sha256Hex(body)
ctx := r.Context()
existing, err := m.queries.FindIdempotencyRecord(ctx, key)
if err == nil {
if existing.CommandHash != commandHash {
httperr.Write(w, r, &ConflictingKeyError{Key: key})
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(int(existing.StatusCode))
_, _ = w.Write(existing.ResponseBody)
return
}
if !errors.Is(err, pgx.ErrNoRows) {
httperr.Write(w, r, fmt.Errorf("idempotency lookup: %w", err))
return
}
rec := &responseRecorder{ResponseWriter: w, buf: &bytes.Buffer{}, status: http.StatusOK}
next.ServeHTTP(rec, r)
tx, txErr := m.pool.Begin(ctx)
if txErr != nil {
return
}
defer tx.Rollback(ctx)
_ = m.queries.WithTx(tx).InsertIdempotencyRecord(ctx, db.InsertIdempotencyRecordParams{
IdempotencyKey: key,
CommandHash: commandHash,
StatusCode: int32(rec.status),
ResponseBody: rec.buf.Bytes(),
ExpiresAt: time.Now().Add(48 * time.Hour),
})
_ = tx.Commit(ctx)
})
}
func sha256Hex(b []byte) string {
sum := sha256.Sum256(b)
return hex.EncodeToString(sum[:])
}
type responseRecorder struct {
http.ResponseWriter
buf *bytes.Buffer
status int
}
func (r *responseRecorder) WriteHeader(code int) {
r.status = code
r.ResponseWriter.WriteHeader(code)
}
func (r *responseRecorder) Write(b []byte) (int, error) {
r.buf.Write(b)
return r.ResponseWriter.Write(b)
}
Подключение в chi-роутере только для money-эндпоинтов:
// adapters/in/http/router.go
r.Route("/payments", func(r chi.Router) {
r.Use(idempotencyMW.Handler)
r.Post("/", paymentHandler.Charge)
r.Post("/{id}/refund", paymentHandler.Refund)
})
Три случая:
- Ключ не встречался — обрабатываем команду, сохраняем результат.
- Ключ + та же команда — возвращаем сохранённый response, клиент не списан дважды.
- Ключ + другая команда —
409 Conflict, клиент переиспользовал ключ.
Двойная защита для money
R-DIST-IDEM-4: money-операции защищаются дважды: Idempotency-Key от клиента + UNIQUE constraint в БД.
CREATE TABLE payment (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
order_id uuid NOT NULL,
customer_id uuid NOT NULL,
provider text NOT NULL,
external_payment_id text NOT NULL,
amount numeric(19,4) NOT NULL,
status text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
UNIQUE (provider, external_payment_id)
);
Handler ловит нарушение constraint и возвращает существующий payment:
// core/payment/handler.go
func (h *ChargePaymentHandler) Handle(ctx context.Context, tx pgx.Tx, cmd ChargePaymentCommand) (*Payment, error) {
qtx := h.queries.WithTx(tx)
p, err := qtx.InsertPayment(ctx, db.InsertPaymentParams{
OrderID: cmd.OrderID,
CustomerID: cmd.CustomerID,
Provider: cmd.Provider,
ExternalPaymentID: cmd.ExternalPaymentID,
Amount: cmd.Amount,
Status: "pending",
})
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == pgerrcode.UniqueViolation {
return qtx.FindPaymentByProviderAndExternal(ctx, db.FindPaymentByProviderAndExternalParams{
Provider: cmd.Provider,
ExternalPaymentID: cmd.ExternalPaymentID,
})
}
return nil, fmt.Errorf("insert payment: %w", err)
}
return p, nil
}
Idempotency-Key защищает от retry на уровне HTTP; unique constraint — от дублей, которые пришли с разными ключами (другой клиент, другой retry-механизм). Два слоя вместе закрывают оба класса инцидентов.
TTL для idempotency-records
R-DIST-IDEM-5: записи держать 24–72 часа.
- Меньше 24 часов — retry клиента через сутки после network outage не проходит дедупликацию → дважды списаны деньги.
- Больше 72 часов — таблица растёт, индекс фрагментируется, autovacuum перегружен.
Очистка — фоновая горутина в errgroup:
// internal/cleanup/idempotency_cleaner.go
type IdempotencyRecordCleaner struct {
queries *db.Queries
}
func (c *IdempotencyRecordCleaner) Run(ctx context.Context) error {
ticker := time.NewTicker(6 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
n, err := c.queries.DeleteExpiredIdempotencyRecords(ctx)
if err != nil {
slog.ErrorContext(ctx, "idempotency cleanup failed", "error", err)
continue
}
if n > 0 {
slog.InfoContext(ctx, "idempotency records cleaned", "count", n)
}
}
}
}
Запуск в main.go через errgroup:
g.Go(func() error { return cleaner.Run(gCtx) })
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Receiver без dedup для money/critical-команд | R-DIST-IDEM-X1 | processed_event + Idempotency-Key middleware |
| Только receiver-side dedup без producer exactly-once | R-DIST-IDEM-X2 | kafka-go RequiredAcks: -1 + Balancer: &kafka.Hash{} |
| Новый UUID при каждом retry на клиенте | R-DIST-IDEM-X3 | один ключ на бизнес-операцию, retry с тем же |
Проверка processed_event вне транзакции | R-DIST-IDEM-2 | queries.WithTx(tx) — проверка и запись в одной pgx.Tx |
| TTL idempotency < 24 часов | R-DIST-IDEM-5 | 24–72 часа + очистка по expires_at |
| Один слой защиты для money | R-DIST-IDEM-4 | client key + UNIQUE (provider, external_payment_id) в БД |
Куда дальше
- Outbox + Inbox — outbox обеспечивает producer-side гарантии at-least-once.
- Saga — каждый шаг саги обязан быть идемпотентным;
saga_idв каждом сообщении. - Compensation — compensation тоже идемпотентен: проверка по
(saga_id, step). - Eventual consistency — causal consistency через
event.Version > current_version. - Distributed Transactions — чего не делать — почему 2PC не работает в Go-стеке.
- When to use distributed patterns — когда вводить паттерны, когда достаточно одной
pgx.Tx.