Опирается на правила: R-DIST-IDEM-1R-DIST-IDEM-5 и R-DIST-IDEM-X1R-DIST-IDEM-X3 из Distributed Patterns Style Guide → раздел 3. Idempotency.

Важно знать

  • Распределённая система всегда at-least-once — retry, rebalance, network timeout приводят к дублям. Receiver обязан быть идемпотентным.
  • Каждое cross-service сообщение имеет уникальный ID: Kafka — event_id UUID v4/v7 в заголовке, HTTP money — Idempotency-Key header, шаг саги — saga_id + step.
  • Проверка и запись processed_event — в одной pgx.Tx с бизнес-обновлением. Проверка вне транзакции — race condition.
  • HTTP-команды — middleware хранит (idempotency_key, command_hash, status_code, response_body): повтор возвращает сохранённый ответ, конфликт ключа с другим телом → 409 Conflict.
  • Money — двойная защита: Idempotency-Key от клиента + UNIQUE (provider_id, external_payment_id) в БД.
  • TTL 24–72 часа для idempotency-records: короче — реальный retry не проходит; длиннее — таблица растёт, autovacuum нагружен.
  • Producer тоже exactly-once: kafka-go RequiredAcks: -1 + Balancer: &kafka.Hash{}. Receiver-side dedup в одиночку недостаточен.
  • В Go нет аннотационной магии (@Transactional): транзакция — явный pgx.Tx, пробрасываемый через queries.WithTx(tx).

В распределённой системе нет гарантии «ровно один раз». Сеть теряет ACK, Kafka rebalance отдаёт то же сообщение новому consumer-у, HTTP retry после таймаута попадает на уже выполненную команду. Единственный рабочий способ — receiver проверяет, не обработал ли он уже это сообщение, и при повторе возвращает тот же результат.

Уникальный ID на каждое сообщение

R-DIST-IDEM-1: каждое cross-service сообщение несёт уникальный ID.

ТранспортIDИсточник
Kafka eventevent_id в заголовке, UUID v4/v7producer генерирует
HTTP money commandIdempotency-Key headerклиент генерирует
Шаг сагиsaga_id + step_nameorchestrator

UUID v7 монотонно возрастает (timestamp в старших битах) — последовательные вставки в processed_event не фрагментируют B-tree индекс PG.

Kafka-producer выставляет event_id в заголовок при отправке:

// internal/outbox/relay.go
kafka.Message{
    Topic: m.Topic,
    Key:   []byte(m.Key),
    Value: m.Payload,
    Headers: []kafka.Header{
        {Key: "event_id", Value: []byte(m.MessageID.String())},
        {Key: "saga_id",  Value: []byte(m.SagaID.String())},
    },
}

Processed-events для Kafka consumer

R-DIST-IDEM-2: receiver хранит event_id обработанных сообщений в таблице processed_event и проверяет перед обработкой в той же транзакции. Kafka rebalance отдаёт сообщение другому consumer-у — без dedup команда выполнится дважды.

CREATE TABLE processed_event (
    event_id      uuid        PRIMARY KEY,
    topic         text        NOT NULL,
    consumer_name text        NOT NULL,
    created_at    timestamptz NOT NULL DEFAULT now(),
    expires_at    timestamptz NOT NULL
);
CREATE INDEX ix_processed_event_expires_at ON processed_event (expires_at);

sqlc-запрос проверяет существование и вставляет атомарно:

-- query.sql
-- name: ExistsProcessedEvent :one
SELECT EXISTS (
    SELECT 1 FROM processed_event WHERE event_id = $1
);

-- name: InsertProcessedEvent :exec
INSERT INTO processed_event (event_id, topic, consumer_name, expires_at)
VALUES ($1, $2, $3, $4);

Consumer проверяет и вставляет в одной транзакции:

// adapters/in/kafka/order_consumer.go
func headerVal(headers []kafka.Header, key string) string {
    for _, h := range headers {
        if h.Key == key {
            return string(h.Value)
        }
    }
    return ""
}

func (c *OrderConsumer) handleMessage(ctx context.Context, tx pgx.Tx, msg kafka.Message) error {
    eventID := headerVal(msg.Headers, "event_id")
    qtx := c.queries.WithTx(tx)

    exists, err := qtx.ExistsProcessedEvent(ctx, eventID)
    if err != nil {
        return fmt.Errorf("check processed event: %w", err)
    }
    if exists {
        return nil
    }

    if err := c.processOrderEvent(ctx, qtx, msg); err != nil {
        return err
    }

    return qtx.InsertProcessedEvent(ctx, db.InsertProcessedEventParams{
        EventID:      eventID,
        Topic:        msg.Topic,
        ConsumerName: "order-service",
        ExpiresAt:    time.Now().Add(72 * time.Hour),
    })
}

Вся логика — ExistsProcessedEvent → processOrderEvent → InsertProcessedEvent — в одной pgx.Tx. Если бизнес-операция упала после проверки но до записи processed_event — транзакция откатится целиком, следующий retry обработает корректно.

Idempotency middleware для HTTP-команд

R-DIST-IDEM-3: HTTP-команды (особенно money) хранят (idempotency_key, command_hash, status_code, response_body). Повтор с тем же ключом и тем же телом → сохранённый response. Тот же ключ, другое тело → 409 Conflict.

CREATE TABLE idempotency_record (
    idempotency_key text        PRIMARY KEY,
    command_hash    text        NOT NULL,
    status_code     int         NOT NULL,
    response_body   bytea       NOT NULL,
    created_at      timestamptz NOT NULL DEFAULT now(),
    expires_at      timestamptz NOT NULL
);
CREATE INDEX ix_idempotency_record_expires_at ON idempotency_record (expires_at);

Middleware перехватывает запрос до handler-а:

// adapters/in/http/middleware/idempotency.go
package middleware

type ConflictingKeyError struct{ Key string }

func (e *ConflictingKeyError) Error() string {
    return fmt.Sprintf("idempotency key %q used with different command", e.Key)
}

type IdempotencyMiddleware struct {
    queries *db.Queries
    pool    *pgxpool.Pool
}

func NewIdempotencyMiddleware(queries *db.Queries, pool *pgxpool.Pool) *IdempotencyMiddleware {
    return &IdempotencyMiddleware{queries: queries, pool: pool}
}

func (m *IdempotencyMiddleware) Handler(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        key := r.Header.Get("Idempotency-Key")
        if key == "" {
            next.ServeHTTP(w, r)
            return
        }

        body, err := io.ReadAll(r.Body)
        if err != nil {
            httperr.Write(w, r, fmt.Errorf("read body: %w", err))
            return
        }
        r.Body = io.NopCloser(bytes.NewReader(body))
        commandHash := sha256Hex(body)

        ctx := r.Context()
        existing, err := m.queries.FindIdempotencyRecord(ctx, key)
        if err == nil {
            if existing.CommandHash != commandHash {
                httperr.Write(w, r, &ConflictingKeyError{Key: key})
                return
            }
            w.Header().Set("Content-Type", "application/json")
            w.WriteHeader(int(existing.StatusCode))
            _, _ = w.Write(existing.ResponseBody)
            return
        }
        if !errors.Is(err, pgx.ErrNoRows) {
            httperr.Write(w, r, fmt.Errorf("idempotency lookup: %w", err))
            return
        }

        rec := &responseRecorder{ResponseWriter: w, buf: &bytes.Buffer{}, status: http.StatusOK}
        next.ServeHTTP(rec, r)

        tx, txErr := m.pool.Begin(ctx)
        if txErr != nil {
            return
        }
        defer tx.Rollback(ctx)
        _ = m.queries.WithTx(tx).InsertIdempotencyRecord(ctx, db.InsertIdempotencyRecordParams{
            IdempotencyKey: key,
            CommandHash:    commandHash,
            StatusCode:     int32(rec.status),
            ResponseBody:   rec.buf.Bytes(),
            ExpiresAt:      time.Now().Add(48 * time.Hour),
        })
        _ = tx.Commit(ctx)
    })
}

func sha256Hex(b []byte) string {
    sum := sha256.Sum256(b)
    return hex.EncodeToString(sum[:])
}

type responseRecorder struct {
    http.ResponseWriter
    buf    *bytes.Buffer
    status int
}

func (r *responseRecorder) WriteHeader(code int) {
    r.status = code
    r.ResponseWriter.WriteHeader(code)
}

func (r *responseRecorder) Write(b []byte) (int, error) {
    r.buf.Write(b)
    return r.ResponseWriter.Write(b)
}

Подключение в chi-роутере только для money-эндпоинтов:

// adapters/in/http/router.go
r.Route("/payments", func(r chi.Router) {
    r.Use(idempotencyMW.Handler)
    r.Post("/", paymentHandler.Charge)
    r.Post("/{id}/refund", paymentHandler.Refund)
})

Три случая:

  • Ключ не встречался — обрабатываем команду, сохраняем результат.
  • Ключ + та же команда — возвращаем сохранённый response, клиент не списан дважды.
  • Ключ + другая команда409 Conflict, клиент переиспользовал ключ.

Двойная защита для money

R-DIST-IDEM-4: money-операции защищаются дважды: Idempotency-Key от клиента + UNIQUE constraint в БД.

CREATE TABLE payment (
    id                  bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    order_id            uuid        NOT NULL,
    customer_id         uuid        NOT NULL,
    provider            text        NOT NULL,
    external_payment_id text        NOT NULL,
    amount              numeric(19,4) NOT NULL,
    status              text        NOT NULL,
    created_at          timestamptz NOT NULL DEFAULT now(),
    UNIQUE (provider, external_payment_id)
);

Handler ловит нарушение constraint и возвращает существующий payment:

// core/payment/handler.go
func (h *ChargePaymentHandler) Handle(ctx context.Context, tx pgx.Tx, cmd ChargePaymentCommand) (*Payment, error) {
    qtx := h.queries.WithTx(tx)
    p, err := qtx.InsertPayment(ctx, db.InsertPaymentParams{
        OrderID:           cmd.OrderID,
        CustomerID:        cmd.CustomerID,
        Provider:          cmd.Provider,
        ExternalPaymentID: cmd.ExternalPaymentID,
        Amount:            cmd.Amount,
        Status:            "pending",
    })
    if err != nil {
        var pgErr *pgconn.PgError
        if errors.As(err, &pgErr) && pgErr.Code == pgerrcode.UniqueViolation {
            return qtx.FindPaymentByProviderAndExternal(ctx, db.FindPaymentByProviderAndExternalParams{
                Provider:          cmd.Provider,
                ExternalPaymentID: cmd.ExternalPaymentID,
            })
        }
        return nil, fmt.Errorf("insert payment: %w", err)
    }
    return p, nil
}

Idempotency-Key защищает от retry на уровне HTTP; unique constraint — от дублей, которые пришли с разными ключами (другой клиент, другой retry-механизм). Два слоя вместе закрывают оба класса инцидентов.

TTL для idempotency-records

R-DIST-IDEM-5: записи держать 24–72 часа.

  • Меньше 24 часов — retry клиента через сутки после network outage не проходит дедупликацию → дважды списаны деньги.
  • Больше 72 часов — таблица растёт, индекс фрагментируется, autovacuum перегружен.

Очистка — фоновая горутина в errgroup:

// internal/cleanup/idempotency_cleaner.go
type IdempotencyRecordCleaner struct {
    queries *db.Queries
}

func (c *IdempotencyRecordCleaner) Run(ctx context.Context) error {
    ticker := time.NewTicker(6 * time.Hour)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return nil
        case <-ticker.C:
            n, err := c.queries.DeleteExpiredIdempotencyRecords(ctx)
            if err != nil {
                slog.ErrorContext(ctx, "idempotency cleanup failed", "error", err)
                continue
            }
            if n > 0 {
                slog.InfoContext(ctx, "idempotency records cleaned", "count", n)
            }
        }
    }
}

Запуск в main.go через errgroup:

g.Go(func() error { return cleaner.Run(gCtx) })

Что запрещено

АнтипаттернПравилоЧто взамен
Receiver без dedup для money/critical-командR-DIST-IDEM-X1processed_event + Idempotency-Key middleware
Только receiver-side dedup без producer exactly-onceR-DIST-IDEM-X2kafka-go RequiredAcks: -1 + Balancer: &kafka.Hash{}
Новый UUID при каждом retry на клиентеR-DIST-IDEM-X3один ключ на бизнес-операцию, retry с тем же
Проверка processed_event вне транзакцииR-DIST-IDEM-2queries.WithTx(tx) — проверка и запись в одной pgx.Tx
TTL idempotency < 24 часовR-DIST-IDEM-524–72 часа + очистка по expires_at
Один слой защиты для moneyR-DIST-IDEM-4client key + UNIQUE (provider, external_payment_id) в БД

Куда дальше

  • Outbox + Inbox — outbox обеспечивает producer-side гарантии at-least-once.
  • Saga — каждый шаг саги обязан быть идемпотентным; saga_id в каждом сообщении.
  • Compensation — compensation тоже идемпотентен: проверка по (saga_id, step).
  • Eventual consistency — causal consistency через event.Version > current_version.
  • Distributed Transactions — чего не делать — почему 2PC не работает в Go-стеке.
  • When to use distributed patterns — когда вводить паттерны, когда достаточно одной pgx.Tx.