Опирается на правила: R-KFK-IDEM-1R-KFK-IDEM-4 и R-KFK-IDEM-X1R-KFK-IDEM-X2 из Kafka Rules → раздел 4. Idempotent consumer.

Важно знать

  • Kafka — at-least-once: дубликаты при rebalance, DLQ replay, offset reset — норма, не аномалия.
  • Уникальный eventId (UUID v7) в payload — единственный надёжный dedup-ключ.
  • processed_event таблица с PK на event_id — UNIQUE constraint дедуплицирует даже под race conditions.
  • orIgnore() insert и бизнес-результат — в одной DataSource.transaction.
  • Money — двойная защита: eventId + Idempotency-Key на downstream HTTP.
  • TTL через partitioning + drop_old, либо @Interval background-job.
  • Kafka offset как dedup-ключ запрещён: offset зависит от consumer-group.
  • autoCommit: false — offset коммитится только после успешной обработки, иначе дубль невидим.

Любой kafkajs-consumer обязан быть idempotent. Producer с idempotent: true снимает дубли на уровне producer-partition, но не защищает при rebalance до commitOffsets, при DLQ replay, при сбросе offset. Ответственность за «уже видели» лежит на стороне consumer.

Уникальный eventId в payload

R-KFK-IDEM-1: каждое событие содержит UUID v7.

// core/order/domain/event/order-confirmed.event.ts
import { v7 as uuidv7 } from 'uuid';

export interface OrderConfirmedEvent {
  readonly eventId: string;        // UUID v7
  readonly eventType: string;      // 'order.confirmed.v1'
  readonly occurredAt: string;     // ISO-8601
  readonly aggregateType: 'Order';
  readonly orderId: number;
  readonly customerId: number;
  readonly totalAmount: number;
  readonly currency: string;
}

export function buildOrderConfirmedEvent(order: {
  id: number;
  customerId: number;
  totalAmount: number;
  currency: string;
  confirmedAt: Date;
}): OrderConfirmedEvent {
  return {
    eventId: uuidv7(),
    eventType: 'order.confirmed.v1',
    occurredAt: order.confirmedAt.toISOString(),
    aggregateType: 'Order',
    orderId: order.id,
    customerId: order.customerId,
    totalAmount: order.totalAmount,
    currency: order.currency,
  };
}

UUID v7 — time-sortable (первые 48 бит — timestamp). INSERT в processed_event по PK B-tree последовательный, фрагментация минимальна. По eventId можно восстановить хронологию без отдельного индекса по occurredAt.

processed_event таблица

R-KFK-IDEM-2: DDL схема.

CREATE TABLE processed_event (
    event_id       uuid        PRIMARY KEY,
    consumer_group text        NOT NULL,
    processed_at   timestamptz NOT NULL DEFAULT now()
);

CREATE INDEX ix_processed_event_processed_at
    ON processed_event (processed_at);

PK на event_id — UNIQUE constraint на уровне БД. Два параллельных eachMessage на одном дубле: один INSERT проходит, второй молча игнорируется (orIgnore()). Без этого оба дойдут до бизнес-логики.

Если несколько consumer-groups читают один топик и обрабатывают один eventId независимо (например, billing-service и notification-service оба слушают orders.confirmed), заменить PK:

ALTER TABLE processed_event DROP CONSTRAINT processed_event_pkey;
ALTER TABLE processed_event ADD PRIMARY KEY (event_id, consumer_group);

Тогда billing-service и notification-service хранят eventId раздельно и не мешают друг другу.

TypeORM entity

// infrastructure/kafka/entity/processed-event.entity.ts
import { Column, Entity, PrimaryColumn } from 'typeorm';

@Entity('processed_event')
export class ProcessedEventEntity {
  @PrimaryColumn({ type: 'uuid', name: 'event_id' })
  eventId: string;

  @Column({ type: 'text', name: 'consumer_group' })
  consumerGroup: string;

  @Column({ type: 'timestamptz', name: 'processed_at', default: () => 'now()' })
  processedAt: Date;
}

Dedup в eachMessage

R-KFK-IDEM-3: проверка и бизнес-результат в одной транзакции.

// infrastructure/kafka/listener/order-confirmed.listener.ts
import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
import { DataSource } from 'typeorm';
import { Kafka } from 'kafkajs';
import { z } from 'zod';
import { KafkaConfig } from '../config/kafka.config';
import { ProcessedEventEntity } from '../entity/processed-event.entity';
import { BillingService } from '../../billing/billing.service';

const orderConfirmedSchema = z.object({
  eventId: z.string().uuid(),
  eventType: z.literal('order.confirmed.v1'),
  occurredAt: z.string().datetime(),
  orderId: z.number().int().positive(),
  customerId: z.number().int().positive(),
  totalAmount: z.number().positive(),
  currency: z.string().length(3),
});

@Injectable()
export class OrderConfirmedListener implements OnApplicationBootstrap {
  private readonly consumer;

  constructor(
    private readonly kafkaConfig: KafkaConfig,
    private readonly dataSource: DataSource,
    private readonly billingService: BillingService,
  ) {
    const kafka = new Kafka({
      clientId: kafkaConfig.clientId,
      brokers: kafkaConfig.brokers,
      ssl: kafkaConfig.ssl,
    });
    this.consumer = kafka.consumer({ groupId: 'billing-order-confirmed' });
  }

  async onApplicationBootstrap(): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
    await this.consumer.run({
      autoCommit: false,
      partitionsConsumedConcurrently: 3,
      eachMessage: async ({ topic, partition, message }) => {
        const event = orderConfirmedSchema.parse(
          JSON.parse(message.value!.toString()),
        );

        await this.dataSource.transaction(async (manager) => {
          const result = await manager
            .createQueryBuilder()
            .insert()
            .into(ProcessedEventEntity)
            .values({ eventId: event.eventId, consumerGroup: 'billing-order-confirmed' })
            .orIgnore()
            .execute();

          if (!result.identifiers.length) {
            return;
          }

          await this.billingService.chargeOrder(manager, event.orderId, event.totalAmount, event.currency);
        });

        await this.consumer.commitOffsets([
          { topic, partition, offset: String(Number(message.offset) + 1) },
        ]);
      },
    });
  }
}

orIgnore() — TypeORM-обёртка над INSERT ... ON CONFLICT DO NOTHING. Если identifiers.length === 0 — событие уже обработано, транзакция завершается без бизнес-изменений, offset коммитируется. Если identifiers.length > 0 — первый раз, обрабатываем.

commitOffsets вызывается после завершения транзакции: offset продвигается только при гарантированно успешной записи. autoCommit: false — обязательно.

Сценарий с падением между транзакцией и commitOffsets: процесс рестартует, kafkajs переполучает то же сообщение, orIgnore() ловит duplicate, offset коммитируется без повторной бизнес-операции. Всё корректно.

Money — двойная защита

R-KFK-IDEM-4: eventId на стороне consumer + Idempotency-Key на downstream HTTP.

// infrastructure/payment/payment.client.ts
import { Injectable } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';
import { firstValueFrom } from 'rxjs';

@Injectable()
export class PaymentClient {
  constructor(private readonly httpService: HttpService) {}

  async charge(params: {
    orderId: number;
    amount: number;
    currency: string;
    idempotencyKey: string;
  }): Promise<void> {
    await firstValueFrom(
      this.httpService.post('/v1/charges', {
        orderId: params.orderId,
        amount: params.amount,
        currency: params.currency,
      }, {
        headers: { 'Idempotency-Key': params.idempotencyKey },
      }),
    );
  }
}
// в BillingService.chargeOrder
await this.paymentClient.charge({
  orderId: event.orderId,
  amount: event.totalAmount,
  currency: event.currency,
  idempotencyKey: event.eventId,
});

Сценарий поломки: payment-provider ответил 200 OK, connection reset — consumer не получил ответ, транзакция откатилась, processed_event пуст. Следующий poll: orIgnore() даёт identifiers.length > 0, второй charge уходит к провайдеру с тем же Idempotency-Key = eventId. Провайдер дедуплицирует на своей стороне. Двойного списания нет.

TTL processed_event

Таблица растёт линейно с потоком событий. Без TTL — неограниченный рост.

Вариант 1 — partitioning:

CREATE TABLE processed_event (
    event_id       uuid        NOT NULL,
    consumer_group text        NOT NULL,
    processed_at   timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (event_id, processed_at)
) PARTITION BY RANGE (processed_at);

CREATE TABLE processed_event_2026_06
    PARTITION OF processed_event
    FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');

CREATE TABLE processed_event_2026_07
    PARTITION OF processed_event
    FOR VALUES FROM ('2026-07-01') TO ('2026-08-01');

Старые партиции дропаем целиком — DROP TABLE processed_event_2026_05. Cleanup мгновенный (нет DELETE по миллионам строк).

Вариант 2 — @Interval background-job:

// infrastructure/kafka/processed-event-cleanup.service.ts
import { Injectable } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';
import { DataSource } from 'typeorm';
import { ProcessedEventEntity } from './entity/processed-event.entity';

@Injectable()
export class ProcessedEventCleanupService {
  constructor(private readonly dataSource: DataSource) {}

  @Interval(24 * 60 * 60 * 1000)
  async cleanup(): Promise<void> {
    const cutoff = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
    await this.dataSource
      .createQueryBuilder()
      .delete()
      .from(ProcessedEventEntity)
      .where('processedAt < :cutoff', { cutoff })
      .execute();
  }
}

Retention 7 дней достаточен для большинства случаев: дубли появляются в пределах часов при rebalance, не через недели. Для топиков с replay из DLQ — увеличить до 30 дней.

Что запрещено

АнтипаттернПравилоЧто взамен
eachMessage без проверки eventId для criticalR-KFK-IDEM-X1processed_event + orIgnore() insert
Kafka offset как dedup-ключR-KFK-IDEM-X2eventId UUID v7 в payload
Money без Idempotency-Key на downstream HTTPR-KFK-IDEM-4headers: { 'Idempotency-Key': event.eventId }
processed_event без TTLR-KFK-IDEM-2partitioning или @Interval cleanup
orIgnore() вне транзакции с бизнес-результатомR-KFK-IDEM-3dataSource.transaction(async (m) => { ... })
autoCommit: true (дефолт kafkajs!)R-KFK-CONS-X1autoCommit: false + commitOffsets после TX
Random UUID вместо UUID v7R-KFK-IDEM-1uuidv7() из пакета uuid

Куда дальше

  • Consumer — autoCommit: false, commitOffsets, groupId, heartbeat.
  • Event design — eventId в payload, zod-схема, forward-compatibility.
  • Outbox publishing — eventId генерируется producer-side в outbox.
  • Retry topic + DLQ — DLQ replay порождает дубли, dedup обязателен.
  • Observability — tracing traceparent в headers через @opentelemetry/instrumentation-kafkajs.
  • Конфигурация — KafkaConfig с zod, fail-fast на старте.
  • Producer — idempotent: true, acks: -1, key = aggregate id.
  • Security — TLS, ACL per-сервис, PII в restricted-топиках.