Опирается на правила:
R-KFK-IDEM-1…R-KFK-IDEM-4иR-KFK-IDEM-X1…R-KFK-IDEM-X2из Kafka Style Guide → раздел 4. Idempotent consumer.
Важно знать
- Kafka — at-least-once: дубликаты норма, consumer обязан с ними справляться.
- Уникальный
eventId(UUID v7) в payload — единственный надёжный dedup-ключ.processed_eventтаблица с PRIMARY KEY наevent_id— UNIQUE constraint защищает даже под race conditions.- Запись в
processed_eventи бизнес-результат — в одной транзакции.- Money — двойная защита:
eventId+Idempotency-Keyна downstream HTTP.- TTL через partitioning + drop_old, либо background-job.
- Kafka offset как dedup — нельзя: offset зависит от consumer-group.
Любой Kafka consumer обязан быть idempotent. Producer с enable.idempotence: true устраняет дубликаты на уровне producer-partition, но дубликаты возможны при rebalance до ack, при DLQ replay, при offset reset. Receiver должен сам отделять «уже обрабатывал» от «впервые вижу».
Уникальный eventId в payload
R-KFK-IDEM-1: каждое событие имеет UUID v7 идентификатор.
public record OrderConfirmedEvent(
UUID eventId,
String eventType,
OffsetDateTime occurredAt,
Long orderId,
Long customerId,
Money totalAmount
) {
public static OrderConfirmedEvent from(Order order) {
return new OrderConfirmedEvent(
UuidV7.generate(),
"order.confirmed.v1",
order.getConfirmedAt(),
order.getId(),
order.getCustomerId(),
order.getTotalAmount()
);
}
}
UUID v7 — time-sortable (первые 48 bits timestamp). Это даёт sequential insert в processed_event PK B-tree (низкая фрагментация) + возможность сортировать события по времени без отдельного occurredAt индекса.
processed_event таблица
R-KFK-IDEM-2: schema.
CREATE TABLE processed_event (
event_id uuid PRIMARY KEY,
consumer_group text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX ix_processed_event_processed_at ON processed_event(processed_at);
PRIMARY KEY на event_id — UNIQUE constraint обеспечивает дедупликацию на уровне БД. Даже если два thread-а одного consumer попытаются обработать дубль одновременно — один INSERT успешен, второй ловит ConstraintViolationException.
consumer_group колонка нужна когда разные consumer-groups читают тот же топик и обрабатывают одно и то же eventId независимо (например, billing-service и analytics-service оба слушают order-events). PRIMARY KEY на event_id тогда не подходит — заменить на PRIMARY KEY (event_id, consumer_group).
Dedup в листенере
R-KFK-IDEM-3: проверка + бизнес-логика в одной транзакции.
@Component
@RequiredArgsConstructor
public class OrderConfirmedListener {
private final ProcessedEventRepository processedEventRepository;
private final BillingService billingService;
@KafkaListener(topics = "order-service.order.confirmed", groupId = "billing-service")
@Transactional
public void onConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
if (!processedEventRepository.tryMarkProcessed(event.eventId(), "billing-service")) {
ack.acknowledge();
return;
}
billingService.charge(event.orderId(), event.totalAmount());
ack.acknowledge();
}
}
tryMarkProcessed — INSERT ... ON CONFLICT DO NOTHING возвращает true при successful insert, false при duplicate.
public boolean tryMarkProcessed(UUID eventId, String consumerGroup) {
var rows = dsl.insertInto(PROCESSED_EVENT)
.columns(PROCESSED_EVENT.EVENT_ID, PROCESSED_EVENT.CONSUMER_GROUP)
.values(eventId, consumerGroup)
.onConflictDoNothing()
.execute();
return rows > 0;
}
Транзакция оборачивает обе операции (tryMarkProcessed + billingService.charge). Если charge бросает exception — processed_event тоже откатывается, next poll получает то же событие и обрабатывает заново.
Если процесс упал между charge commit и ack.acknowledge() — processed_event уже сохранил eventId, следующий poll увидит «уже processed» и просто ack-нет дубль. Идемпотентность работает.
Money — двойная защита
R-KFK-IDEM-4: для money-операций — eventId + Idempotency-Key на downstream HTTP.
@Transactional
public void onConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
if (!processedEventRepository.tryMarkProcessed(event.eventId(), "billing-service")) {
ack.acknowledge();
return;
}
paymentClient.charge(
event.orderId(),
event.totalAmount(),
Map.of("Idempotency-Key", event.eventId().toString())
);
ack.acknowledge();
}
Сценарий: payment-provider получил charge, ответил 200 OK, но connection reset → consumer не получил response → транзакция откатилась → processed_event пуст. Следующий poll → tryMarkProcessed=true → второй charge.
Без Idempotency-Key payment-provider обрабатывает charge дважды. С Idempotency-Key = eventId provider дедуплицирует на своей стороне (см. Distributed → idempotency).
TTL processed_event
Таблица растёт линейно с потоком событий. TTL обязателен.
Вариант 1 — partitioning:
CREATE TABLE processed_event (
event_id uuid NOT NULL,
consumer_group text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (event_id, processed_at)
) PARTITION BY RANGE (processed_at);
CREATE TABLE processed_event_2026_05 PARTITION OF processed_event
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
Каждый месяц — отдельная partition; старые drop-аем целиком. Cleanup мгновенный (DROP TABLE vs миллионы DELETE).
Вариант 2 — простой background job:
@Scheduled(cron = "0 0 3 * * *", zone = "UTC")
public void cleanup() {
var cutoff = Instant.now().minus(7, ChronoUnit.DAYS);
processedEventRepository.deleteOlderThan(cutoff);
}
Cleanup ночью. Retention 7 дней — нормально для большинства случаев: дубликаты приходят в пределах часов, не дней.
Что запрещено
Listener без проверки eventId
R-KFK-IDEM-X1: «обычно срабатывает один раз» — рассуждение, после которого инцидент с двойным списанием.
Любой production-listener обязан иметь dedup. Без него — двойные emails, двойные charges, двойные order confirmations. Стоимость инцидента сильно выше стоимости одной processed_event таблицы.
Kafka offset как dedup-ключ
R-KFK-IDEM-X2: offset зависит от consumer-group, не от события.
Сценарий поломки:
- Consumer-group
billing-serviceобработал offset 0-1000. - Запускаем новый consumer-group
analytics-service,auto.offset.reset=earliest. - Analytics получает события с offset 0-1000.
- Если бы dedup был по offset — analytics никогда не обрабатывал бы события 0-1000 (offset уже «использован» в billing).
Offset — это позиция в логе для конкретного consumer-group, не идентификатор события. Dedup-ключ — eventId в payload, генерируемый producer-ом.
Что запрещено — таблица
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Listener без проверки eventId для critical | R-KFK-IDEM-X1 | processed_event + tryMarkProcessed |
| Kafka offset как dedup-ключ | R-KFK-IDEM-X2 | eventId UUID v7 |
| Money без двойной защиты | R-KFK-IDEM-4 | eventId + Idempotency-Key HTTP |
processed_event без TTL | R-KFK-IDEM-2 | partitioning или background-job |
tryMarkProcessed без ON CONFLICT DO NOTHING | R-KFK-IDEM-2 | INSERT ... ON CONFLICT DO NOTHING |
Dedup-логика без @Transactional | R-KFK-IDEM-3 | проверка + бизнес-логика в одной TX |
| Random UUID, не UUID v7 | R-KFK-IDEM-1 | UUID v7 (time-sortable) |
eventId в Kafka header вместо payload | R-KFK-IDEM-1 | в payload как первое поле record |
Куда дальше
- Kafka → раздел 4. Idempotent consumer — нормативные формулировки.
- Consumer — manual ack, group.id.
- Distributed → idempotency — теория at-least-once,
Idempotency-Key. - Event design —
eventIdв payload, форматы. - Outbox publishing —
eventIdгенерируется producer-side. - Retry topic + DLQ — DLQ replay часто = дубль.
- PG schema → partitioning — partitioning
processed_event.