Опирается на правила: R-KFK-IDEM-1R-KFK-IDEM-4 и R-KFK-IDEM-X1R-KFK-IDEM-X2 из Kafka Style Guide → раздел 4. Idempotent consumer.

Важно знать

  • Kafka — at-least-once: дубликаты норма, consumer обязан с ними справляться.
  • Уникальный eventId (UUID v7) в payload — единственный надёжный dedup-ключ.
  • processed_event таблица с PRIMARY KEY на event_id — UNIQUE constraint защищает даже под race conditions.
  • Запись в processed_event и бизнес-результат — в одной транзакции.
  • Money — двойная защита: eventId + Idempotency-Key на downstream HTTP.
  • TTL через partitioning + drop_old, либо background-job.
  • Kafka offset как dedup — нельзя: offset зависит от consumer-group.

Любой Kafka consumer обязан быть idempotent. Producer с enable.idempotence: true устраняет дубликаты на уровне producer-partition, но дубликаты возможны при rebalance до ack, при DLQ replay, при offset reset. Receiver должен сам отделять «уже обрабатывал» от «впервые вижу».

Уникальный eventId в payload

R-KFK-IDEM-1: каждое событие имеет UUID v7 идентификатор.

public record OrderConfirmedEvent(
    UUID eventId,
    String eventType,
    OffsetDateTime occurredAt,
    Long orderId,
    Long customerId,
    Money totalAmount
) {
    public static OrderConfirmedEvent from(Order order) {
        return new OrderConfirmedEvent(
            UuidV7.generate(),
            "order.confirmed.v1",
            order.getConfirmedAt(),
            order.getId(),
            order.getCustomerId(),
            order.getTotalAmount()
        );
    }
}

UUID v7 — time-sortable (первые 48 bits timestamp). Это даёт sequential insert в processed_event PK B-tree (низкая фрагментация) + возможность сортировать события по времени без отдельного occurredAt индекса.

processed_event таблица

R-KFK-IDEM-2: schema.

CREATE TABLE processed_event (
    event_id       uuid PRIMARY KEY,
    consumer_group text NOT NULL,
    processed_at   timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX ix_processed_event_processed_at ON processed_event(processed_at);

PRIMARY KEY на event_id — UNIQUE constraint обеспечивает дедупликацию на уровне БД. Даже если два thread-а одного consumer попытаются обработать дубль одновременно — один INSERT успешен, второй ловит ConstraintViolationException.

consumer_group колонка нужна когда разные consumer-groups читают тот же топик и обрабатывают одно и то же eventId независимо (например, billing-service и analytics-service оба слушают order-events). PRIMARY KEY на event_id тогда не подходит — заменить на PRIMARY KEY (event_id, consumer_group).

Dedup в листенере

R-KFK-IDEM-3: проверка + бизнес-логика в одной транзакции.

@Component
@RequiredArgsConstructor
public class OrderConfirmedListener {

    private final ProcessedEventRepository processedEventRepository;
    private final BillingService billingService;

    @KafkaListener(topics = "order-service.order.confirmed", groupId = "billing-service")
    @Transactional
    public void onConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
        if (!processedEventRepository.tryMarkProcessed(event.eventId(), "billing-service")) {
            ack.acknowledge();
            return;
        }
        billingService.charge(event.orderId(), event.totalAmount());
        ack.acknowledge();
    }
}

tryMarkProcessedINSERT ... ON CONFLICT DO NOTHING возвращает true при successful insert, false при duplicate.

public boolean tryMarkProcessed(UUID eventId, String consumerGroup) {
    var rows = dsl.insertInto(PROCESSED_EVENT)
        .columns(PROCESSED_EVENT.EVENT_ID, PROCESSED_EVENT.CONSUMER_GROUP)
        .values(eventId, consumerGroup)
        .onConflictDoNothing()
        .execute();
    return rows > 0;
}

Транзакция оборачивает обе операции (tryMarkProcessed + billingService.charge). Если charge бросает exception — processed_event тоже откатывается, next poll получает то же событие и обрабатывает заново.

Если процесс упал между charge commit и ack.acknowledge()processed_event уже сохранил eventId, следующий poll увидит «уже processed» и просто ack-нет дубль. Идемпотентность работает.

Money — двойная защита

R-KFK-IDEM-4: для money-операций — eventId + Idempotency-Key на downstream HTTP.

@Transactional
public void onConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
    if (!processedEventRepository.tryMarkProcessed(event.eventId(), "billing-service")) {
        ack.acknowledge();
        return;
    }
    paymentClient.charge(
        event.orderId(),
        event.totalAmount(),
        Map.of("Idempotency-Key", event.eventId().toString())
    );
    ack.acknowledge();
}

Сценарий: payment-provider получил charge, ответил 200 OK, но connection reset → consumer не получил response → транзакция откатилась → processed_event пуст. Следующий poll → tryMarkProcessed=true → второй charge.

Без Idempotency-Key payment-provider обрабатывает charge дважды. С Idempotency-Key = eventId provider дедуплицирует на своей стороне (см. Distributed → idempotency).

TTL processed_event

Таблица растёт линейно с потоком событий. TTL обязателен.

Вариант 1 — partitioning:

CREATE TABLE processed_event (
    event_id       uuid NOT NULL,
    consumer_group text NOT NULL,
    processed_at   timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (event_id, processed_at)
) PARTITION BY RANGE (processed_at);

CREATE TABLE processed_event_2026_05 PARTITION OF processed_event
    FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');

Каждый месяц — отдельная partition; старые drop-аем целиком. Cleanup мгновенный (DROP TABLE vs миллионы DELETE).

Вариант 2 — простой background job:

@Scheduled(cron = "0 0 3 * * *", zone = "UTC")
public void cleanup() {
    var cutoff = Instant.now().minus(7, ChronoUnit.DAYS);
    processedEventRepository.deleteOlderThan(cutoff);
}

Cleanup ночью. Retention 7 дней — нормально для большинства случаев: дубликаты приходят в пределах часов, не дней.

Что запрещено

Listener без проверки eventId

R-KFK-IDEM-X1: «обычно срабатывает один раз» — рассуждение, после которого инцидент с двойным списанием.

Любой production-listener обязан иметь dedup. Без него — двойные emails, двойные charges, двойные order confirmations. Стоимость инцидента сильно выше стоимости одной processed_event таблицы.

Kafka offset как dedup-ключ

R-KFK-IDEM-X2: offset зависит от consumer-group, не от события.

Сценарий поломки:

  1. Consumer-group billing-service обработал offset 0-1000.
  2. Запускаем новый consumer-group analytics-service, auto.offset.reset=earliest.
  3. Analytics получает события с offset 0-1000.
  4. Если бы dedup был по offset — analytics никогда не обрабатывал бы события 0-1000 (offset уже «использован» в billing).

Offset — это позиция в логе для конкретного consumer-group, не идентификатор события. Dedup-ключ — eventId в payload, генерируемый producer-ом.

Что запрещено — таблица

АнтипаттернПравилоЧто взамен
Listener без проверки eventId для criticalR-KFK-IDEM-X1processed_event + tryMarkProcessed
Kafka offset как dedup-ключR-KFK-IDEM-X2eventId UUID v7
Money без двойной защитыR-KFK-IDEM-4eventId + Idempotency-Key HTTP
processed_event без TTLR-KFK-IDEM-2partitioning или background-job
tryMarkProcessed без ON CONFLICT DO NOTHINGR-KFK-IDEM-2INSERT ... ON CONFLICT DO NOTHING
Dedup-логика без @TransactionalR-KFK-IDEM-3проверка + бизнес-логика в одной TX
Random UUID, не UUID v7R-KFK-IDEM-1UUID v7 (time-sortable)
eventId в Kafka header вместо payloadR-KFK-IDEM-1в payload как первое поле record

Куда дальше

  • Kafka → раздел 4. Idempotent consumer — нормативные формулировки.
  • Consumer — manual ack, group.id.
  • Distributed → idempotency — теория at-least-once, Idempotency-Key.
  • Event design — eventId в payload, форматы.
  • Outbox publishing — eventId генерируется producer-side.
  • Retry topic + DLQ — DLQ replay часто = дубль.
  • PG schema → partitioning — partitioning processed_event.