Опирается на правила: R-KFK-IDEM-1R-KFK-IDEM-4 и R-KFK-IDEM-X1R-KFK-IDEM-X2 из Kafka Style Guide → раздел 4. Idempotent consumer.

Важно знать

  • Kafka — at-least-once: rebalance до ack, DLQ replay, offset reset — каждый сценарий даёт дубль; consumer обязан с ним справляться.
  • Уникальный EventID (UUID v7) в payload — единственный надёжный dedup-ключ; Kafka offset зависит от consumer-group и не годится.
  • processed_event таблица с PRIMARY KEY на event_id — UNIQUE на уровне БД защищает даже при гонке двух горутин.
  • TryInsert и бизнес-результат — в одной pgx.Tx: если горутина упала после записи в БД, но до CommitMessages — следующий poll увидит «уже processed» и просто ack-нет дубль.
  • Money — двойная защита: EventID + Idempotency-Key на downstream HTTP-вызовах.
  • TTL таблицы — partitioning + DROP TABLE по месяцу или фоновая горутина с DELETE WHERE processed_at < cutoff.
  • errors.As по типу — классификация transient/permanent для routing в retry или DLQ.

Любой Kafka consumer в Go обязан быть idempotent. kafka.Writer с RequiredAcks: kafka.RequireAll устраняет дубли на уровне producer-partition, но дубликаты при rebalance до CommitMessages, при DLQ replay и при offset reset — ответственность consumer. Горутина должна сама отличать «уже обрабатывал» от «впервые вижу».

Уникальный EventID в payload

R-KFK-IDEM-1: каждое событие несёт UUID v7 как EventID.

// core/order/event/order_confirmed.go
type OrderConfirmedEvent struct {
    EventID     string    `json:"event_id"`
    EventType   string    `json:"event_type"`
    OccurredAt  time.Time `json:"occurred_at"`
    OrderID     string    `json:"order_id"`
    CustomerID  string    `json:"customer_id"`
    TotalAmount int64     `json:"total_amount"` // минорные единицы
}

func NewOrderConfirmedEvent(order Order) OrderConfirmedEvent {
    return OrderConfirmedEvent{
        EventID:     uuidv7.New().String(),
        EventType:   "OrderConfirmed",
        OccurredAt:  order.ConfirmedAt,
        OrderID:     order.ID,
        CustomerID:  order.CustomerID,
        TotalAmount: order.TotalAmount,
    }
}

UUID v7 — time-sortable (первые 48 бит — timestamp). Это даёт sequential insert в B-tree индекс processed_event с минимальной фрагментацией, а сортировка событий по EventID работает без отдельного индекса на occurred_at.

processed_event таблица

R-KFK-IDEM-2: DDL с PRIMARY KEY на event_id.

CREATE TABLE processed_event (
    event_id       uuid        PRIMARY KEY,
    consumer_group text        NOT NULL,
    processed_at   timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX ix_processed_event_at ON processed_event (processed_at);

PRIMARY KEY обеспечивает UNIQUE-ограничение на уровне БД. Даже если две горутины пытаются вставить один event_id одновременно — одна получает INSERT 1, вторая получает ошибку нарушения уникальности, которую TryInsert превращает в false.

Если один топик обслуживают несколько независимых consumer-group (например, billing-order-confirmed и analytics-order-confirmed), замени PRIMARY KEY:

ALTER TABLE processed_event
    DROP CONSTRAINT processed_event_pkey,
    ADD PRIMARY KEY (event_id, consumer_group);

TryInsert — dedup-примитив

R-KFK-IDEM-2/3: INSERT ... ON CONFLICT DO NOTHING, возвращает true при первом insert, false при дубле.

// infra/postgres/dedup_repo.go
type DedupRepo struct{ db *pgxpool.Pool }

func (r *DedupRepo) TryInsert(ctx context.Context, tx pgx.Tx, eventID, group string) (bool, error) {
    tag, err := tx.Exec(ctx,
        `INSERT INTO processed_event (event_id, consumer_group)
         VALUES ($1, $2)
         ON CONFLICT DO NOTHING`,
        eventID, group,
    )
    if err != nil {
        return false, fmt.Errorf("dedup insert: %w", err)
    }
    return tag.RowsAffected() == 1, nil
}

Dedup в handler-горутине

R-KFK-IDEM-3: TryInsert и бизнес-логика — в одной pgx.Tx.

// adapters/in/kafka/order_confirmed_handler.go
type OrderConfirmedHandler struct {
    db      *pgxpool.Pool
    dedup   *DedupRepo
    billing *BillingService
}

const group = "billing-order-confirmed"

func (h *OrderConfirmedHandler) Handle(ctx context.Context, evt OrderConfirmedEvent) error {
    tx, err := h.db.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin: %w", err)
    }
    defer tx.Rollback(ctx)

    inserted, err := h.dedup.TryInsert(ctx, tx, evt.EventID, group)
    if err != nil {
        return err
    }
    if !inserted {
        slog.InfoContext(ctx, "duplicate skipped", "event_id", evt.EventID)
        return nil
    }

    if err := h.billing.ApplyOrderConfirmed(ctx, tx, evt); err != nil {
        return err
    }
    return tx.Commit(ctx)
}

Транзакция оборачивает обе операции. Если горутина упала после Commit, но до CommitMessagesprocessed_event уже содержит EventID. Следующий poll ловит дубль, TryInsert возвращает false, горутина логирует и возвращает nil. Жизненный цикл замкнут.

Consumer-цикл с manual commit

R-KFK-CONS-2, R-KFK-IDEM-1/3: CommitMessages — только после успешного Handle.

// adapters/in/kafka/order_confirmed_consumer.go
type OrderConfirmedConsumer struct {
    reader  *kafka.Reader
    handler *OrderConfirmedHandler
}

func (c *OrderConfirmedConsumer) Run(ctx context.Context) error {
    for {
        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            if errors.Is(err, context.Canceled) {
                return nil
            }
            return fmt.Errorf("fetch: %w", err)
        }

        var evt OrderConfirmedEvent
        if err := json.Unmarshal(msg.Value, &evt); err != nil {
            slog.ErrorContext(ctx, "unmarshal failed, routing to DLQ",
                "error", err, "offset", msg.Offset)
            c.sendToDLQ(ctx, msg, err)
            _ = c.reader.CommitMessages(ctx, msg)
            continue
        }

        if err := c.handler.Handle(ctx, evt); err != nil {
            if isTransient(err) {
                c.sendToRetry(ctx, msg)
            } else {
                c.sendToDLQ(ctx, msg, err)
            }
        }

        if err := c.reader.CommitMessages(ctx, msg); err != nil {
            return fmt.Errorf("commit: %w", err)
        }
    }
}

FetchMessage + явный CommitMessages (а не ReadMessage) — это и есть manual commit в kafka-go (CommitInterval: 0 на ReaderConfig).

Money — двойная защита

R-KFK-IDEM-4: для операций со счётом добавь Idempotency-Key на исходящий HTTP-запрос.

// adapters/out/http/payment_client.go
func (c *PaymentClient) Charge(ctx context.Context, tx pgx.Tx, evt OrderConfirmedEvent) error {
    req, err := http.NewRequestWithContext(ctx, http.MethodPost,
        c.baseURL+"/charges",
        encodeCharge(evt.OrderID, evt.TotalAmount),
    )
    if err != nil {
        return fmt.Errorf("build charge request: %w", err)
    }
    req.Header.Set("Idempotency-Key", evt.EventID) // R-KFK-IDEM-4

    resp, err := c.http.Do(req)
    if err != nil {
        return &GatewayError{Cause: err}
    }
    defer resp.Body.Close()

    if resp.StatusCode == http.StatusConflict {
        return nil // provider уже обработал этот EventID
    }
    if resp.StatusCode >= 500 {
        return &GatewayError{Status: resp.StatusCode}
    }
    return nil
}

Сценарий без Idempotency-Key: provider ответил 200, connection reset → горутина не получила ответ → Commit не случился → processed_event пуст → следующий poll → второй charge. С Idempotency-Key = EventID provider дедуплицирует на своей стороне.

isTransient классифицирует ошибку через errors.As по типу, не строке:

func isTransient(err error) bool {
    var ge *GatewayError
    return errors.As(err, &ge)
}

TTL processed_event

Таблица растёт линейно с потоком событий — TTL обязателен.

Вариант 1 — partitioning (рекомендован при высоком throughput):

CREATE TABLE processed_event (
    event_id       uuid        NOT NULL,
    consumer_group text        NOT NULL,
    processed_at   timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (event_id, processed_at)
) PARTITION BY RANGE (processed_at);

CREATE TABLE processed_event_2026_06 PARTITION OF processed_event
    FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');

DROP TABLE processed_event_2026_04 вместо миллионов DELETE — cleanup мгновенный.

Вариант 2 — фоновая горутина (достаточно при умеренном потоке):

// infra/kafka/dedup_cleanup.go
func RunDedupCleanup(ctx context.Context, db *pgxpool.Pool, retention time.Duration) {
    ticker := time.NewTicker(24 * time.Hour)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            cutoff := time.Now().Add(-retention)
            tag, err := db.Exec(ctx,
                `DELETE FROM processed_event WHERE processed_at < $1`, cutoff)
            if err != nil {
                slog.ErrorContext(ctx, "dedup cleanup error", "error", err)
                continue
            }
            slog.InfoContext(ctx, "dedup cleanup done",
                "deleted", tag.RowsAffected(), "cutoff", cutoff)
        }
    }
}

Retention 7 дней — нормально: дубликаты приходят в пределах часов, не дней.

Что запрещено

АнтипаттернПравилоЧто взамен
Handler без проверки EventID для critical consumerR-KFK-IDEM-X1TryInsert в processed_event перед бизнес-логикой
Kafka offset как dedup-ключR-KFK-IDEM-X2EventID UUID v7 в payload
TryInsert и бизнес-операция в разных транзакцияхR-KFK-IDEM-3одна pgx.Tx на обе операции
Money без Idempotency-Key на downstream HTTPR-KFK-IDEM-4req.Header.Set("Idempotency-Key", evt.EventID)
processed_event без TTLR-KFK-IDEM-2partitioning или cleanup-горутина
INSERT без ON CONFLICT DO NOTHINGR-KFK-IDEM-2RowsAffected() == 1 как сигнал first-write
EventID не UUID v7, а случайный UUID v4R-KFK-IDEM-1UUID v7 (time-sortable → sequential PK insert)
CommitMessages до HandleR-KFK-CONS-2CommitMessages только после успеха

Куда дальше

  • Consumer — FetchMessage / CommitMessages, GroupID, StartOffset.
  • Event design — структура OrderConfirmedEvent, UUID v7, int64 для денег.
  • Outbox publishing — EventID генерируется на стороне producer через outbox.
  • Retry topic + DLQ — DLQ replay = дубль; почему isTransient обязателен.
  • Observability — traceparent в Kafka headers, promauto метрики.
  • Конфигурация — KafkaConfig через envconfig, CommitInterval: 0.
  • Producer — RequiredAcks: kafka.RequireAll, partition key.
  • Security — TLS dialer, per-service ClientID.