---
title: "Idempotent consumer — processed_event и dedup по eventId (Node/NestJS)"
nav_title: "Idempotent consumer"
excerpt: "Idempotent consumer в NestJS + kafkajs: processed_event с PK на event_id, orIgnore-insert и бизнес-результат в одной транзакции DataSource."
keywords: "Kafka idempotent consumer NestJS, kafkajs processed_event, eventId UUID v7, dedup orIgnore, at-least-once, R-KFK-IDEM, DataSource transaction TypeORM, Idempotency-Key money"
focus_keyword: "Kafka idempotent consumer NestJS kafkajs"
tags:
  - kafka
  - nestjs
  - typescript
  - idempotent-consumer
  - kafkajs
---

# Idempotent consumer — processed_event и dedup по eventId (Node/NestJS)

> **Опирается на правила:** `R-KFK-IDEM-1` … `R-KFK-IDEM-4` и `R-KFK-IDEM-X1` … `R-KFK-IDEM-X2` из Kafka Rules → [раздел 4. Idempotent consumer](/standards/backend/kafka/#4-idempotent-consumer).

> **Важно знать**
> - **Kafka — at-least-once**: дубликаты при rebalance, DLQ replay, offset reset — норма, не аномалия.
> - **Уникальный `eventId`** (UUID v7) в payload — единственный надёжный dedup-ключ.
> - **`processed_event` таблица** с PK на `event_id` — UNIQUE constraint дедуплицирует даже под race conditions.
> - **`orIgnore()` insert и бизнес-результат** — в **одной** `DataSource.transaction`.
> - **Money** — двойная защита: `eventId` + `Idempotency-Key` на downstream HTTP.
> - **TTL** через partitioning + drop_old, либо `@Interval` background-job.
> - **Kafka offset** как dedup-ключ запрещён: offset зависит от consumer-group.
> - **`autoCommit: false`** — offset коммитится только после успешной обработки, иначе дубль невидим.

Любой kafkajs-consumer обязан быть idempotent. Producer с `idempotent: true` снимает дубли на уровне producer-partition, но не защищает при rebalance до `commitOffsets`, при DLQ replay, при сбросе offset. Ответственность за «уже видели» лежит на стороне consumer.

## Уникальный `eventId` в payload

`R-KFK-IDEM-1`: каждое событие содержит UUID v7.

```ts
// core/order/domain/event/order-confirmed.event.ts
import { v7 as uuidv7 } from 'uuid';

export interface OrderConfirmedEvent {
  readonly eventId: string;        // UUID v7
  readonly eventType: string;      // 'order.confirmed.v1'
  readonly occurredAt: string;     // ISO-8601
  readonly aggregateType: 'Order';
  readonly orderId: number;
  readonly customerId: number;
  readonly totalAmount: number;
  readonly currency: string;
}

export function buildOrderConfirmedEvent(order: {
  id: number;
  customerId: number;
  totalAmount: number;
  currency: string;
  confirmedAt: Date;
}): OrderConfirmedEvent {
  return {
    eventId: uuidv7(),
    eventType: 'order.confirmed.v1',
    occurredAt: order.confirmedAt.toISOString(),
    aggregateType: 'Order',
    orderId: order.id,
    customerId: order.customerId,
    totalAmount: order.totalAmount,
    currency: order.currency,
  };
}
```

UUID v7 — time-sortable (первые 48 бит — timestamp). INSERT в `processed_event` по PK B-tree последовательный, фрагментация минимальна. По `eventId` можно восстановить хронологию без отдельного индекса по `occurredAt`.

## `processed_event` таблица

`R-KFK-IDEM-2`: DDL схема.

```sql
CREATE TABLE processed_event (
    event_id       uuid        PRIMARY KEY,
    consumer_group text        NOT NULL,
    processed_at   timestamptz NOT NULL DEFAULT now()
);

CREATE INDEX ix_processed_event_processed_at
    ON processed_event (processed_at);
```

PK на `event_id` — UNIQUE constraint на уровне БД. Два параллельных `eachMessage` на одном дубле: один INSERT проходит, второй молча игнорируется (`orIgnore()`). Без этого оба дойдут до бизнес-логики.

Если несколько consumer-groups читают один топик и обрабатывают один `eventId` независимо (например, `billing-service` и `notification-service` оба слушают `orders.confirmed`), заменить PK:

```sql
ALTER TABLE processed_event DROP CONSTRAINT processed_event_pkey;
ALTER TABLE processed_event ADD PRIMARY KEY (event_id, consumer_group);
```

Тогда `billing-service` и `notification-service` хранят `eventId` раздельно и не мешают друг другу.

## TypeORM entity

```ts
// infrastructure/kafka/entity/processed-event.entity.ts
import { Column, Entity, PrimaryColumn } from 'typeorm';

@Entity('processed_event')
export class ProcessedEventEntity {
  @PrimaryColumn({ type: 'uuid', name: 'event_id' })
  eventId: string;

  @Column({ type: 'text', name: 'consumer_group' })
  consumerGroup: string;

  @Column({ type: 'timestamptz', name: 'processed_at', default: () => 'now()' })
  processedAt: Date;
}
```

## Dedup в `eachMessage`

`R-KFK-IDEM-3`: проверка и бизнес-результат в одной транзакции.

```ts
// infrastructure/kafka/listener/order-confirmed.listener.ts
import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
import { DataSource } from 'typeorm';
import { Kafka } from 'kafkajs';
import { z } from 'zod';
import { KafkaConfig } from '../config/kafka.config';
import { ProcessedEventEntity } from '../entity/processed-event.entity';
import { BillingService } from '../../billing/billing.service';

const orderConfirmedSchema = z.object({
  eventId: z.string().uuid(),
  eventType: z.literal('order.confirmed.v1'),
  occurredAt: z.string().datetime(),
  orderId: z.number().int().positive(),
  customerId: z.number().int().positive(),
  totalAmount: z.number().positive(),
  currency: z.string().length(3),
});

@Injectable()
export class OrderConfirmedListener implements OnApplicationBootstrap {
  private readonly consumer;

  constructor(
    private readonly kafkaConfig: KafkaConfig,
    private readonly dataSource: DataSource,
    private readonly billingService: BillingService,
  ) {
    const kafka = new Kafka({
      clientId: kafkaConfig.clientId,
      brokers: kafkaConfig.brokers,
      ssl: kafkaConfig.ssl,
    });
    this.consumer = kafka.consumer({ groupId: 'billing-order-confirmed' });
  }

  async onApplicationBootstrap(): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
    await this.consumer.run({
      autoCommit: false,
      partitionsConsumedConcurrently: 3,
      eachMessage: async ({ topic, partition, message }) => {
        const event = orderConfirmedSchema.parse(
          JSON.parse(message.value!.toString()),
        );

        await this.dataSource.transaction(async (manager) => {
          const result = await manager
            .createQueryBuilder()
            .insert()
            .into(ProcessedEventEntity)
            .values({ eventId: event.eventId, consumerGroup: 'billing-order-confirmed' })
            .orIgnore()
            .execute();

          if (!result.identifiers.length) {
            return;
          }

          await this.billingService.chargeOrder(manager, event.orderId, event.totalAmount, event.currency);
        });

        await this.consumer.commitOffsets([
          { topic, partition, offset: String(Number(message.offset) + 1) },
        ]);
      },
    });
  }
}
```

`orIgnore()` — TypeORM-обёртка над `INSERT ... ON CONFLICT DO NOTHING`. Если `identifiers.length === 0` — событие уже обработано, транзакция завершается без бизнес-изменений, offset коммитируется. Если `identifiers.length > 0` — первый раз, обрабатываем.

`commitOffsets` вызывается **после** завершения транзакции: offset продвигается только при гарантированно успешной записи. `autoCommit: false` — обязательно.

Сценарий с падением между транзакцией и `commitOffsets`: процесс рестартует, kafkajs переполучает то же сообщение, `orIgnore()` ловит duplicate, offset коммитируется без повторной бизнес-операции. Всё корректно.

## Money — двойная защита

`R-KFK-IDEM-4`: `eventId` на стороне consumer + `Idempotency-Key` на downstream HTTP.

```ts
// infrastructure/payment/payment.client.ts
import { Injectable } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';
import { firstValueFrom } from 'rxjs';

@Injectable()
export class PaymentClient {
  constructor(private readonly httpService: HttpService) {}

  async charge(params: {
    orderId: number;
    amount: number;
    currency: string;
    idempotencyKey: string;
  }): Promise<void> {
    await firstValueFrom(
      this.httpService.post('/v1/charges', {
        orderId: params.orderId,
        amount: params.amount,
        currency: params.currency,
      }, {
        headers: { 'Idempotency-Key': params.idempotencyKey },
      }),
    );
  }
}
```

```ts
// в BillingService.chargeOrder
await this.paymentClient.charge({
  orderId: event.orderId,
  amount: event.totalAmount,
  currency: event.currency,
  idempotencyKey: event.eventId,
});
```

Сценарий поломки: payment-provider ответил 200 OK, connection reset — consumer не получил ответ, транзакция откатилась, `processed_event` пуст. Следующий poll: `orIgnore()` даёт `identifiers.length > 0`, второй charge уходит к провайдеру с тем же `Idempotency-Key = eventId`. Провайдер дедуплицирует на своей стороне. Двойного списания нет.

## TTL `processed_event`

Таблица растёт линейно с потоком событий. Без TTL — неограниченный рост.

**Вариант 1 — partitioning:**

```sql
CREATE TABLE processed_event (
    event_id       uuid        NOT NULL,
    consumer_group text        NOT NULL,
    processed_at   timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (event_id, processed_at)
) PARTITION BY RANGE (processed_at);

CREATE TABLE processed_event_2026_06
    PARTITION OF processed_event
    FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');

CREATE TABLE processed_event_2026_07
    PARTITION OF processed_event
    FOR VALUES FROM ('2026-07-01') TO ('2026-08-01');
```

Старые партиции дропаем целиком — `DROP TABLE processed_event_2026_05`. Cleanup мгновенный (нет DELETE по миллионам строк).

**Вариант 2 — `@Interval` background-job:**

```ts
// infrastructure/kafka/processed-event-cleanup.service.ts
import { Injectable } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';
import { DataSource } from 'typeorm';
import { ProcessedEventEntity } from './entity/processed-event.entity';

@Injectable()
export class ProcessedEventCleanupService {
  constructor(private readonly dataSource: DataSource) {}

  @Interval(24 * 60 * 60 * 1000)
  async cleanup(): Promise<void> {
    const cutoff = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
    await this.dataSource
      .createQueryBuilder()
      .delete()
      .from(ProcessedEventEntity)
      .where('processedAt < :cutoff', { cutoff })
      .execute();
  }
}
```

Retention 7 дней достаточен для большинства случаев: дубли появляются в пределах часов при rebalance, не через недели. Для топиков с replay из DLQ — увеличить до 30 дней.

## Что запрещено

| Антипаттерн | Правило | Что взамен |
|---|---|---|
| `eachMessage` без проверки `eventId` для critical | `R-KFK-IDEM-X1` | `processed_event` + `orIgnore()` insert |
| Kafka offset как dedup-ключ | `R-KFK-IDEM-X2` | `eventId` UUID v7 в payload |
| Money без `Idempotency-Key` на downstream HTTP | `R-KFK-IDEM-4` | `headers: { 'Idempotency-Key': event.eventId }` |
| `processed_event` без TTL | `R-KFK-IDEM-2` | partitioning или `@Interval` cleanup |
| `orIgnore()` вне транзакции с бизнес-результатом | `R-KFK-IDEM-3` | `dataSource.transaction(async (m) => { ... })` |
| `autoCommit: true` (дефолт kafkajs!) | `R-KFK-CONS-X1` | `autoCommit: false` + `commitOffsets` после TX |
| Random UUID вместо UUID v7 | `R-KFK-IDEM-1` | `uuidv7()` из пакета `uuid` |

## Куда дальше

- [Consumer](node/consumer.md) — `autoCommit: false`, `commitOffsets`, `groupId`, `heartbeat`.
- [Event design](node/event-design.md) — `eventId` в payload, zod-схема, forward-compatibility.
- [Outbox publishing](node/outbox-publishing.md) — `eventId` генерируется producer-side в outbox.
- [Retry topic + DLQ](node/retry-and-dlq.md) — DLQ replay порождает дубли, dedup обязателен.
- [Observability](node/observability.md) — tracing `traceparent` в headers через `@opentelemetry/instrumentation-kafkajs`.
- [Конфигурация](node/configuration.md) — `KafkaConfig` с zod, fail-fast на старте.
- [Producer](node/producer.md) — `idempotent: true`, `acks: -1`, key = aggregate id.
- [Security](node/security.md) — TLS, ACL per-сервис, PII в restricted-топиках.
