Опирается на правила:
R-KFK-IDEM-1…R-KFK-IDEM-4иR-KFK-IDEM-X1…R-KFK-IDEM-X2из Kafka Rules → раздел 4. Idempotent consumer.
Важно знать
- Kafka — at-least-once: дубликаты при rebalance, DLQ replay, offset reset — норма, не аномалия.
- Уникальный
eventId(UUID v7) в payload — единственный надёжный dedup-ключ.processed_eventтаблица с PK наevent_id— UNIQUE constraint дедуплицирует даже под race conditions.orIgnore()insert и бизнес-результат — в однойDataSource.transaction.- Money — двойная защита:
eventId+Idempotency-Keyна downstream HTTP.- TTL через partitioning + drop_old, либо
@Intervalbackground-job.- Kafka offset как dedup-ключ запрещён: offset зависит от consumer-group.
autoCommit: false— offset коммитится только после успешной обработки, иначе дубль невидим.
Любой kafkajs-consumer обязан быть idempotent. Producer с idempotent: true снимает дубли на уровне producer-partition, но не защищает при rebalance до commitOffsets, при DLQ replay, при сбросе offset. Ответственность за «уже видели» лежит на стороне consumer.
Уникальный eventId в payload
R-KFK-IDEM-1: каждое событие содержит UUID v7.
// core/order/domain/event/order-confirmed.event.ts
import { v7 as uuidv7 } from 'uuid';
export interface OrderConfirmedEvent {
readonly eventId: string; // UUID v7
readonly eventType: string; // 'order.confirmed.v1'
readonly occurredAt: string; // ISO-8601
readonly aggregateType: 'Order';
readonly orderId: number;
readonly customerId: number;
readonly totalAmount: number;
readonly currency: string;
}
export function buildOrderConfirmedEvent(order: {
id: number;
customerId: number;
totalAmount: number;
currency: string;
confirmedAt: Date;
}): OrderConfirmedEvent {
return {
eventId: uuidv7(),
eventType: 'order.confirmed.v1',
occurredAt: order.confirmedAt.toISOString(),
aggregateType: 'Order',
orderId: order.id,
customerId: order.customerId,
totalAmount: order.totalAmount,
currency: order.currency,
};
}
UUID v7 — time-sortable (первые 48 бит — timestamp). INSERT в processed_event по PK B-tree последовательный, фрагментация минимальна. По eventId можно восстановить хронологию без отдельного индекса по occurredAt.
processed_event таблица
R-KFK-IDEM-2: DDL схема.
CREATE TABLE processed_event (
event_id uuid PRIMARY KEY,
consumer_group text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX ix_processed_event_processed_at
ON processed_event (processed_at);
PK на event_id — UNIQUE constraint на уровне БД. Два параллельных eachMessage на одном дубле: один INSERT проходит, второй молча игнорируется (orIgnore()). Без этого оба дойдут до бизнес-логики.
Если несколько consumer-groups читают один топик и обрабатывают один eventId независимо (например, billing-service и notification-service оба слушают orders.confirmed), заменить PK:
ALTER TABLE processed_event DROP CONSTRAINT processed_event_pkey;
ALTER TABLE processed_event ADD PRIMARY KEY (event_id, consumer_group);
Тогда billing-service и notification-service хранят eventId раздельно и не мешают друг другу.
TypeORM entity
// infrastructure/kafka/entity/processed-event.entity.ts
import { Column, Entity, PrimaryColumn } from 'typeorm';
@Entity('processed_event')
export class ProcessedEventEntity {
@PrimaryColumn({ type: 'uuid', name: 'event_id' })
eventId: string;
@Column({ type: 'text', name: 'consumer_group' })
consumerGroup: string;
@Column({ type: 'timestamptz', name: 'processed_at', default: () => 'now()' })
processedAt: Date;
}
Dedup в eachMessage
R-KFK-IDEM-3: проверка и бизнес-результат в одной транзакции.
// infrastructure/kafka/listener/order-confirmed.listener.ts
import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
import { DataSource } from 'typeorm';
import { Kafka } from 'kafkajs';
import { z } from 'zod';
import { KafkaConfig } from '../config/kafka.config';
import { ProcessedEventEntity } from '../entity/processed-event.entity';
import { BillingService } from '../../billing/billing.service';
const orderConfirmedSchema = z.object({
eventId: z.string().uuid(),
eventType: z.literal('order.confirmed.v1'),
occurredAt: z.string().datetime(),
orderId: z.number().int().positive(),
customerId: z.number().int().positive(),
totalAmount: z.number().positive(),
currency: z.string().length(3),
});
@Injectable()
export class OrderConfirmedListener implements OnApplicationBootstrap {
private readonly consumer;
constructor(
private readonly kafkaConfig: KafkaConfig,
private readonly dataSource: DataSource,
private readonly billingService: BillingService,
) {
const kafka = new Kafka({
clientId: kafkaConfig.clientId,
brokers: kafkaConfig.brokers,
ssl: kafkaConfig.ssl,
});
this.consumer = kafka.consumer({ groupId: 'billing-order-confirmed' });
}
async onApplicationBootstrap(): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
await this.consumer.run({
autoCommit: false,
partitionsConsumedConcurrently: 3,
eachMessage: async ({ topic, partition, message }) => {
const event = orderConfirmedSchema.parse(
JSON.parse(message.value!.toString()),
);
await this.dataSource.transaction(async (manager) => {
const result = await manager
.createQueryBuilder()
.insert()
.into(ProcessedEventEntity)
.values({ eventId: event.eventId, consumerGroup: 'billing-order-confirmed' })
.orIgnore()
.execute();
if (!result.identifiers.length) {
return;
}
await this.billingService.chargeOrder(manager, event.orderId, event.totalAmount, event.currency);
});
await this.consumer.commitOffsets([
{ topic, partition, offset: String(Number(message.offset) + 1) },
]);
},
});
}
}
orIgnore() — TypeORM-обёртка над INSERT ... ON CONFLICT DO NOTHING. Если identifiers.length === 0 — событие уже обработано, транзакция завершается без бизнес-изменений, offset коммитируется. Если identifiers.length > 0 — первый раз, обрабатываем.
commitOffsets вызывается после завершения транзакции: offset продвигается только при гарантированно успешной записи. autoCommit: false — обязательно.
Сценарий с падением между транзакцией и commitOffsets: процесс рестартует, kafkajs переполучает то же сообщение, orIgnore() ловит duplicate, offset коммитируется без повторной бизнес-операции. Всё корректно.
Money — двойная защита
R-KFK-IDEM-4: eventId на стороне consumer + Idempotency-Key на downstream HTTP.
// infrastructure/payment/payment.client.ts
import { Injectable } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';
import { firstValueFrom } from 'rxjs';
@Injectable()
export class PaymentClient {
constructor(private readonly httpService: HttpService) {}
async charge(params: {
orderId: number;
amount: number;
currency: string;
idempotencyKey: string;
}): Promise<void> {
await firstValueFrom(
this.httpService.post('/v1/charges', {
orderId: params.orderId,
amount: params.amount,
currency: params.currency,
}, {
headers: { 'Idempotency-Key': params.idempotencyKey },
}),
);
}
}
// в BillingService.chargeOrder
await this.paymentClient.charge({
orderId: event.orderId,
amount: event.totalAmount,
currency: event.currency,
idempotencyKey: event.eventId,
});
Сценарий поломки: payment-provider ответил 200 OK, connection reset — consumer не получил ответ, транзакция откатилась, processed_event пуст. Следующий poll: orIgnore() даёт identifiers.length > 0, второй charge уходит к провайдеру с тем же Idempotency-Key = eventId. Провайдер дедуплицирует на своей стороне. Двойного списания нет.
TTL processed_event
Таблица растёт линейно с потоком событий. Без TTL — неограниченный рост.
Вариант 1 — partitioning:
CREATE TABLE processed_event (
event_id uuid NOT NULL,
consumer_group text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (event_id, processed_at)
) PARTITION BY RANGE (processed_at);
CREATE TABLE processed_event_2026_06
PARTITION OF processed_event
FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');
CREATE TABLE processed_event_2026_07
PARTITION OF processed_event
FOR VALUES FROM ('2026-07-01') TO ('2026-08-01');
Старые партиции дропаем целиком — DROP TABLE processed_event_2026_05. Cleanup мгновенный (нет DELETE по миллионам строк).
Вариант 2 — @Interval background-job:
// infrastructure/kafka/processed-event-cleanup.service.ts
import { Injectable } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';
import { DataSource } from 'typeorm';
import { ProcessedEventEntity } from './entity/processed-event.entity';
@Injectable()
export class ProcessedEventCleanupService {
constructor(private readonly dataSource: DataSource) {}
@Interval(24 * 60 * 60 * 1000)
async cleanup(): Promise<void> {
const cutoff = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
await this.dataSource
.createQueryBuilder()
.delete()
.from(ProcessedEventEntity)
.where('processedAt < :cutoff', { cutoff })
.execute();
}
}
Retention 7 дней достаточен для большинства случаев: дубли появляются в пределах часов при rebalance, не через недели. Для топиков с replay из DLQ — увеличить до 30 дней.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
eachMessage без проверки eventId для critical | R-KFK-IDEM-X1 | processed_event + orIgnore() insert |
| Kafka offset как dedup-ключ | R-KFK-IDEM-X2 | eventId UUID v7 в payload |
Money без Idempotency-Key на downstream HTTP | R-KFK-IDEM-4 | headers: { 'Idempotency-Key': event.eventId } |
processed_event без TTL | R-KFK-IDEM-2 | partitioning или @Interval cleanup |
orIgnore() вне транзакции с бизнес-результатом | R-KFK-IDEM-3 | dataSource.transaction(async (m) => { ... }) |
autoCommit: true (дефолт kafkajs!) | R-KFK-CONS-X1 | autoCommit: false + commitOffsets после TX |
| Random UUID вместо UUID v7 | R-KFK-IDEM-1 | uuidv7() из пакета uuid |
Куда дальше
- Consumer —
autoCommit: false,commitOffsets,groupId,heartbeat. - Event design —
eventIdв payload, zod-схема, forward-compatibility. - Outbox publishing —
eventIdгенерируется producer-side в outbox. - Retry topic + DLQ — DLQ replay порождает дубли, dedup обязателен.
- Observability — tracing
traceparentв headers через@opentelemetry/instrumentation-kafkajs. - Конфигурация —
KafkaConfigс zod, fail-fast на старте. - Producer —
idempotent: true,acks: -1, key = aggregate id. - Security — TLS, ACL per-сервис, PII в restricted-топиках.