Опирается на правила:
R-KFK-IDEM-1…R-KFK-IDEM-4иR-KFK-IDEM-X1…R-KFK-IDEM-X2из Kafka Style Guide → раздел 4. Idempotent consumer.
Важно знать
- Kafka — at-least-once: rebalance до
ack, DLQ replay, offset reset — каждый сценарий даёт дубль; consumer обязан с ним справляться.- Уникальный
EventID(UUID v7) в payload — единственный надёжный dedup-ключ; Kafka offset зависит от consumer-group и не годится.processed_eventтаблица с PRIMARY KEY наevent_id— UNIQUE на уровне БД защищает даже при гонке двух горутин.TryInsertи бизнес-результат — в однойpgx.Tx: если горутина упала после записи в БД, но доCommitMessages— следующий poll увидит «уже processed» и просто ack-нет дубль.- Money — двойная защита:
EventID+Idempotency-Keyна downstream HTTP-вызовах.- TTL таблицы — partitioning +
DROP TABLEпо месяцу или фоновая горутина сDELETE WHERE processed_at < cutoff.errors.Asпо типу — классификация transient/permanent для routing в retry или DLQ.
Любой Kafka consumer в Go обязан быть idempotent. kafka.Writer с RequiredAcks: kafka.RequireAll устраняет дубли на уровне producer-partition, но дубликаты при rebalance до CommitMessages, при DLQ replay и при offset reset — ответственность consumer. Горутина должна сама отличать «уже обрабатывал» от «впервые вижу».
Уникальный EventID в payload
R-KFK-IDEM-1: каждое событие несёт UUID v7 как EventID.
// core/order/event/order_confirmed.go
type OrderConfirmedEvent struct {
EventID string `json:"event_id"`
EventType string `json:"event_type"`
OccurredAt time.Time `json:"occurred_at"`
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
TotalAmount int64 `json:"total_amount"` // минорные единицы
}
func NewOrderConfirmedEvent(order Order) OrderConfirmedEvent {
return OrderConfirmedEvent{
EventID: uuidv7.New().String(),
EventType: "OrderConfirmed",
OccurredAt: order.ConfirmedAt,
OrderID: order.ID,
CustomerID: order.CustomerID,
TotalAmount: order.TotalAmount,
}
}
UUID v7 — time-sortable (первые 48 бит — timestamp). Это даёт sequential insert в B-tree индекс processed_event с минимальной фрагментацией, а сортировка событий по EventID работает без отдельного индекса на occurred_at.
processed_event таблица
R-KFK-IDEM-2: DDL с PRIMARY KEY на event_id.
CREATE TABLE processed_event (
event_id uuid PRIMARY KEY,
consumer_group text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX ix_processed_event_at ON processed_event (processed_at);
PRIMARY KEY обеспечивает UNIQUE-ограничение на уровне БД. Даже если две горутины пытаются вставить один event_id одновременно — одна получает INSERT 1, вторая получает ошибку нарушения уникальности, которую TryInsert превращает в false.
Если один топик обслуживают несколько независимых consumer-group (например, billing-order-confirmed и analytics-order-confirmed), замени PRIMARY KEY:
ALTER TABLE processed_event
DROP CONSTRAINT processed_event_pkey,
ADD PRIMARY KEY (event_id, consumer_group);
TryInsert — dedup-примитив
R-KFK-IDEM-2/3: INSERT ... ON CONFLICT DO NOTHING, возвращает true при первом insert, false при дубле.
// infra/postgres/dedup_repo.go
type DedupRepo struct{ db *pgxpool.Pool }
func (r *DedupRepo) TryInsert(ctx context.Context, tx pgx.Tx, eventID, group string) (bool, error) {
tag, err := tx.Exec(ctx,
`INSERT INTO processed_event (event_id, consumer_group)
VALUES ($1, $2)
ON CONFLICT DO NOTHING`,
eventID, group,
)
if err != nil {
return false, fmt.Errorf("dedup insert: %w", err)
}
return tag.RowsAffected() == 1, nil
}
Dedup в handler-горутине
R-KFK-IDEM-3: TryInsert и бизнес-логика — в одной pgx.Tx.
// adapters/in/kafka/order_confirmed_handler.go
type OrderConfirmedHandler struct {
db *pgxpool.Pool
dedup *DedupRepo
billing *BillingService
}
const group = "billing-order-confirmed"
func (h *OrderConfirmedHandler) Handle(ctx context.Context, evt OrderConfirmedEvent) error {
tx, err := h.db.Begin(ctx)
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer tx.Rollback(ctx)
inserted, err := h.dedup.TryInsert(ctx, tx, evt.EventID, group)
if err != nil {
return err
}
if !inserted {
slog.InfoContext(ctx, "duplicate skipped", "event_id", evt.EventID)
return nil
}
if err := h.billing.ApplyOrderConfirmed(ctx, tx, evt); err != nil {
return err
}
return tx.Commit(ctx)
}
Транзакция оборачивает обе операции. Если горутина упала после Commit, но до CommitMessages — processed_event уже содержит EventID. Следующий poll ловит дубль, TryInsert возвращает false, горутина логирует и возвращает nil. Жизненный цикл замкнут.
Consumer-цикл с manual commit
R-KFK-CONS-2, R-KFK-IDEM-1/3: CommitMessages — только после успешного Handle.
// adapters/in/kafka/order_confirmed_consumer.go
type OrderConfirmedConsumer struct {
reader *kafka.Reader
handler *OrderConfirmedHandler
}
func (c *OrderConfirmedConsumer) Run(ctx context.Context) error {
for {
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return fmt.Errorf("fetch: %w", err)
}
var evt OrderConfirmedEvent
if err := json.Unmarshal(msg.Value, &evt); err != nil {
slog.ErrorContext(ctx, "unmarshal failed, routing to DLQ",
"error", err, "offset", msg.Offset)
c.sendToDLQ(ctx, msg, err)
_ = c.reader.CommitMessages(ctx, msg)
continue
}
if err := c.handler.Handle(ctx, evt); err != nil {
if isTransient(err) {
c.sendToRetry(ctx, msg)
} else {
c.sendToDLQ(ctx, msg, err)
}
}
if err := c.reader.CommitMessages(ctx, msg); err != nil {
return fmt.Errorf("commit: %w", err)
}
}
}
FetchMessage + явный CommitMessages (а не ReadMessage) — это и есть manual commit в kafka-go (CommitInterval: 0 на ReaderConfig).
Money — двойная защита
R-KFK-IDEM-4: для операций со счётом добавь Idempotency-Key на исходящий HTTP-запрос.
// adapters/out/http/payment_client.go
func (c *PaymentClient) Charge(ctx context.Context, tx pgx.Tx, evt OrderConfirmedEvent) error {
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
c.baseURL+"/charges",
encodeCharge(evt.OrderID, evt.TotalAmount),
)
if err != nil {
return fmt.Errorf("build charge request: %w", err)
}
req.Header.Set("Idempotency-Key", evt.EventID) // R-KFK-IDEM-4
resp, err := c.http.Do(req)
if err != nil {
return &GatewayError{Cause: err}
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusConflict {
return nil // provider уже обработал этот EventID
}
if resp.StatusCode >= 500 {
return &GatewayError{Status: resp.StatusCode}
}
return nil
}
Сценарий без Idempotency-Key: provider ответил 200, connection reset → горутина не получила ответ → Commit не случился → processed_event пуст → следующий poll → второй charge. С Idempotency-Key = EventID provider дедуплицирует на своей стороне.
isTransient классифицирует ошибку через errors.As по типу, не строке:
func isTransient(err error) bool {
var ge *GatewayError
return errors.As(err, &ge)
}
TTL processed_event
Таблица растёт линейно с потоком событий — TTL обязателен.
Вариант 1 — partitioning (рекомендован при высоком throughput):
CREATE TABLE processed_event (
event_id uuid NOT NULL,
consumer_group text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (event_id, processed_at)
) PARTITION BY RANGE (processed_at);
CREATE TABLE processed_event_2026_06 PARTITION OF processed_event
FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');
DROP TABLE processed_event_2026_04 вместо миллионов DELETE — cleanup мгновенный.
Вариант 2 — фоновая горутина (достаточно при умеренном потоке):
// infra/kafka/dedup_cleanup.go
func RunDedupCleanup(ctx context.Context, db *pgxpool.Pool, retention time.Duration) {
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cutoff := time.Now().Add(-retention)
tag, err := db.Exec(ctx,
`DELETE FROM processed_event WHERE processed_at < $1`, cutoff)
if err != nil {
slog.ErrorContext(ctx, "dedup cleanup error", "error", err)
continue
}
slog.InfoContext(ctx, "dedup cleanup done",
"deleted", tag.RowsAffected(), "cutoff", cutoff)
}
}
}
Retention 7 дней — нормально: дубликаты приходят в пределах часов, не дней.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Handler без проверки EventID для critical consumer | R-KFK-IDEM-X1 | TryInsert в processed_event перед бизнес-логикой |
| Kafka offset как dedup-ключ | R-KFK-IDEM-X2 | EventID UUID v7 в payload |
TryInsert и бизнес-операция в разных транзакциях | R-KFK-IDEM-3 | одна pgx.Tx на обе операции |
Money без Idempotency-Key на downstream HTTP | R-KFK-IDEM-4 | req.Header.Set("Idempotency-Key", evt.EventID) |
processed_event без TTL | R-KFK-IDEM-2 | partitioning или cleanup-горутина |
INSERT без ON CONFLICT DO NOTHING | R-KFK-IDEM-2 | RowsAffected() == 1 как сигнал first-write |
EventID не UUID v7, а случайный UUID v4 | R-KFK-IDEM-1 | UUID v7 (time-sortable → sequential PK insert) |
CommitMessages до Handle | R-KFK-CONS-2 | CommitMessages только после успеха |
Куда дальше
- Consumer —
FetchMessage/CommitMessages,GroupID,StartOffset. - Event design — структура
OrderConfirmedEvent, UUID v7,int64для денег. - Outbox publishing —
EventIDгенерируется на стороне producer через outbox. - Retry topic + DLQ — DLQ replay = дубль; почему
isTransientобязателен. - Observability —
traceparentв Kafka headers,promautoметрики. - Конфигурация —
KafkaConfigчерезenvconfig,CommitInterval: 0. - Producer —
RequiredAcks: kafka.RequireAll, partition key. - Security — TLS dialer, per-service
ClientID.