Опирается на правила: R-DIST-IDEM-1R-DIST-IDEM-5 и R-DIST-IDEM-X1R-DIST-IDEM-X3 из Distributed Patterns Style Guide → раздел 3. Idempotency.

Важно знать

  • Распределённая система всегда at-least-once — kafkajs rebalance, HTTP retry, сетевой тайм-аут повторяют доставку. Receiver обязан быть идемпотентным.
  • Каждое cross-service сообщение несёт уникальный ID: Kafka — eventId UUID v7, HTTP money — Idempotency-Key header, шаг саги — sagaId + stepName.
  • Receiver хранит processed-events в таблице processed_event; вставка через .orIgnore() — UNIQUE-дедупликация под race condition без явного SELECT + INSERT.
  • HTTP-команды — таблица idempotency_record (idempotency_key, command_hash, response): повтор с тем же ключом возвращает сохранённый ответ; тот же ключ + другое тело → 409 Conflict.
  • Money — двойная защита: client Idempotency-Key + внутренний UNIQUE (payment_provider, external_payment_id) на уровне БД.
  • TTL 24–72 часа для idempotency-records: короче — реальный retry клиента не проходит дедупликацию; длиннее — таблица растёт без пользы.
  • Producer тоже несёт ответственность: kafkajs idempotent: true предотвращает дубли на стороне брокера; receiver-side dedup один не спасает, если producer публикует два разных eventId для одной операции.
  • NestJS @nestjs/cqrs EventBus — in-memory, не распределённый: дедупликация через него бессмысленна; нужна DataSource.transaction с processed_event.

В распределённой системе нет «доставлено ровно один раз». Сеть теряет ACK, kafkajs повторяет при rebalance, HTTP-клиент retry-ит. Единственный способ выжить — каждый receiver проверяет, не обработал ли он это сообщение, и при повторе возвращает тот же результат.

Уникальный ID на каждое сообщение

R-DIST-IDEM-1: каждое cross-service сообщение обязано иметь уникальный ID.

ТранспортIDИсточник
Kafka eventeventId UUID v7producer генерирует
HTTP money commandIdempotency-Key headerclient генерирует
Saga stepsagaId + stepNameorchestrator знает

UUID v7 включает timestamp в первых 48 битах — монотонно растущие значения дают последовательные INSERT в PG (низкая фрагментация B-tree) и позволяют сортировать по времени создания без дополнительного поля.

import { randomUUID } from 'node:crypto';

// PREFER: eventId UUID v7 через uuidv7() из библиотеки uuidv7
// или fallback UUID v4 из node:crypto — главное генерировать ОДИН РАЗ на операцию
export class OrderCreatedEvent {
  readonly eventId: string = randomUUID();   // UUID v4 — подходит для dedup
  readonly sagaId: string;
  readonly eventType = 'OrderCreated.v1';
  readonly orderId: number;
  readonly customerId: number;
  readonly amount: string;
  readonly occurredAt: Date = new Date();

  constructor(sagaId: string, orderId: number, customerId: number, amount: string) {
    this.sagaId = sagaId;
    this.orderId = orderId;
    this.customerId = customerId;
    this.amount = amount;
  }
}

processed_event для kafkajs consumer

R-DIST-IDEM-2: receiver хранит обработанные eventId в БД и проверяет перед обработкой в той же транзакции.

CREATE TABLE processed_event (
    event_id      uuid PRIMARY KEY,
    consumer_name text NOT NULL,
    processed_at  timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX ix_processed_event_processed_at ON processed_event(processed_at);

TypeORM-entity для таблицы:

@Entity('processed_event')
export class ProcessedEventEntity {
  @PrimaryColumn({ name: 'event_id', type: 'uuid' })
  eventId: string;

  @Column({ name: 'consumer_name' })
  consumerName: string;

  @CreateDateColumn({ name: 'processed_at', type: 'timestamptz' })
  processedAt: Date;
}

Consumer в NestJS — получает сообщение через kafkajs, вся обработка внутри DataSource.transaction:

@Injectable()
export class OrderCreatedConsumer implements OnModuleInit {
  constructor(
    private readonly dataSource: DataSource,
    private readonly orderProjectionRepository: OrderProjectionRepository,
  ) {}

  async onModuleInit() {
    // kafkajs consumer подключается отдельно через @Module providers
  }

  async handleMessage(eventId: string, payload: OrderCreatedEvent): Promise<void> {
    await this.dataSource.transaction(async (manager) => {
      // PREFER: INSERT ... ON CONFLICT DO NOTHING — дедуп под race без SELECT
      const result = await manager
        .createQueryBuilder()
        .insert()
        .into(ProcessedEventEntity)
        .values({ eventId, consumerName: 'order-projection' })
        .orIgnore()
        .execute();

      if (result.raw.length === 0) {
        return; // уже обработано — skip
      }

      await this.orderProjectionRepository.upsert(manager, toProjection(payload));
    });
  }
}

.orIgnore() транслируется в INSERT ... ON CONFLICT DO NOTHING — атомарно и безопасно под параллельными consumer-потоками. result.raw.length === 0 означает конфликт (дубль) → skip.

Idempotency-Key для HTTP-команд

R-DIST-IDEM-3: для HTTP-команд receiver хранит (idempotency_key, command_hash, response) тройку.

CREATE TABLE idempotency_record (
    idempotency_key  text PRIMARY KEY,
    command_hash     text NOT NULL,
    response         jsonb NOT NULL,
    http_status      int NOT NULL,
    created_at       timestamptz NOT NULL DEFAULT now()
);

В NestJS удобно вынести логику дедупликации в interceptor, чтобы не дублировать в каждом контроллере:

@Injectable()
export class IdempotencyInterceptor implements NestInterceptor {
  constructor(private readonly idempotencyRepository: IdempotencyRepository) {}

  async intercept(context: ExecutionContext, next: CallHandler): Promise<Observable<unknown>> {
    const request = context.switchToHttp().getRequest<Request>();
    const key = request.headers['idempotency-key'] as string | undefined;

    if (!key) {
      throw new BadRequestException('Idempotency-Key header required');
    }

    const commandHash = computeHash(request.body);
    const existing = await this.idempotencyRepository.findByKey(key);

    if (existing) {
      if (existing.commandHash !== commandHash) {
        throw new ConflictException(`Idempotency-Key already used with different payload`);
      }
      const response = context.switchToHttp().getResponse<Response>();
      response.status(existing.httpStatus).json(existing.response);
      return EMPTY;
    }

    return next.handle().pipe(
      tap(async (responseBody) => {
        const httpResponse = context.switchToHttp().getResponse<Response>();
        await this.idempotencyRepository.save({
          idempotencyKey: key,
          commandHash,
          response: responseBody,
          httpStatus: httpResponse.statusCode,
        });
      }),
    );
  }
}

Применение на write-эндпоинтах:

@Controller('payments')
@UseInterceptors(IdempotencyInterceptor)
export class PaymentController {
  constructor(private readonly dispatcher: CommandBus) {}

  @Post()
  @HttpCode(201)
  async charge(@Body() dto: ChargeRequestDto): Promise<PaymentResponseDto> {
    const result = await this.dispatcher.execute(new ChargePaymentCommand(dto));
    return PaymentResponseDto.from(result);
  }
}

Три случая обработки:

  • Ключ не встречался — команда выполняется, результат сохраняется, ответ возвращается.
  • Ключ встречался + то же тело — сохранённый response возвращается без повторного выполнения.
  • Ключ встречался + другое тело409 Conflict. Клиент неправильно переиспользует ключ.

Двойная защита для money

R-DIST-IDEM-4: money-операции защищаются дважды: client Idempotency-Key + внутренний UNIQUE constraint в БД.

CREATE TABLE payment (
    id                    bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    payment_provider      text NOT NULL,
    external_payment_id   text NOT NULL,
    amount                numeric(19,4) NOT NULL,
    status                text NOT NULL,
    customer_id           bigint NOT NULL,
    created_at            timestamptz NOT NULL DEFAULT now(),
    UNIQUE (payment_provider, external_payment_id)
);

TypeORM entity для платежа Сбера:

@Entity('payment')
export class PaymentEntity {
  @PrimaryGeneratedColumn({ name: 'id', type: 'bigint' })
  id: number;

  @Column({ name: 'payment_provider' })
  paymentProvider: string;

  @Column({ name: 'external_payment_id' })
  externalPaymentId: string;

  @Column({ name: 'amount', type: 'numeric', precision: 19, scale: 4 })
  amount: string;

  @Column({ name: 'status' })
  status: string;

  @Column({ name: 'customer_id', type: 'bigint' })
  customerId: number;

  @CreateDateColumn({ name: 'created_at', type: 'timestamptz' })
  createdAt: Date;
}

Если Idempotency-Key не помог (клиент использовал разные ключи при retry), UNIQUE (payment_provider, external_payment_id) ловит дубль на уровне БД:

@Injectable()
export class CreatePaymentHandler implements ICommandHandler<ChargePaymentCommand> {
  constructor(
    @InjectRepository(PaymentEntity)
    private readonly paymentRepository: Repository<PaymentEntity>,
  ) {}

  async execute(command: ChargePaymentCommand): Promise<PaymentResult> {
    try {
      const payment = this.paymentRepository.create({
        paymentProvider: command.provider,
        externalPaymentId: command.externalId,
        amount: command.amount.toFixed(4),
        status: 'PENDING',
        customerId: command.customerId,
      });
      await this.paymentRepository.save(payment);
      return PaymentResult.created(payment.id);
    } catch (error) {
      if (isUniqueViolation(error, 'payment_payment_provider_external_payment_id_key')) {
        const existing = await this.paymentRepository.findOneByOrFail({
          paymentProvider: command.provider,
          externalPaymentId: command.externalId,
        });
        return PaymentResult.existing(existing.id);
      }
      throw error;
    }
  }
}

isUniqueViolation — утилита, проверяющая error.code === '23505' (PostgreSQL) и имя constraint.

TTL для idempotency-records

R-DIST-IDEM-5: idempotency-records хранятся 24–72 часа.

  • Меньше 24 часов — реальный retry клиента (через час после сетевого сбоя) не проходит дедупликацию → списали дважды.
  • Больше 72 часов — таблица растёт, нагрузка на autovacuum, размер индекса.

Cleanup через NestJS @Cron:

import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { LessThan } from 'typeorm';

@Injectable()
export class IdempotencyCleanupService {
  constructor(
    @InjectRepository(IdempotencyRecordEntity)
    private readonly idempotencyRepository: Repository<IdempotencyRecordEntity>,
  ) {}

  @Cron(CronExpression.EVERY_DAY_AT_3AM)
  async cleanup(): Promise<void> {
    const cutoff = new Date(Date.now() - 72 * 60 * 60 * 1000);
    await this.idempotencyRepository.delete({ createdAt: LessThan(cutoff) });
  }
}

@nestjs/schedule подключается в ScheduleModule.forRoot() в корневом AppModule. Cleanup-job работает ночью — не добавляет overhead на каждую запись.

Producer-side: kafkajs idempotent producer

R-DIST-IDEM-X2: receiver-side dedup не спасает, если producer публикует два разных eventId для одной операции при partial failure.

const kafka = new Kafka({ brokers: ['kafka:9092'] });

const producer = kafka.producer({
  idempotent: true,                // R-KFK-PROD-1 — Kafka acks=all + sequence numbers
  maxInFlightRequests: 5,
  retry: { retries: Number.MAX_SAFE_INTEGER },
});

С idempotent: true kafkajs гарантирует exactly-once на уровне брокера: при retry одно и то же сообщение не дублируется — брокер отклоняет повторную запись по sequence number. eventId в payload при этом остаётся тем же.

Что запрещено

АнтипаттернПравилоЧто взамен
Receiver без dedup для money / critical-командR-DIST-IDEM-X1processed_event + Idempotency-Key interceptor
Только receiver-side dedup, producer без idempotent: trueR-DIST-IDEM-X2kafkajs idempotent: true + receiver dedup
randomUUID() при каждом retry на клиентеR-DIST-IDEM-X3один ключ генерируется на бизнес-операцию, переиспользуется
TTL idempotency < 24 чR-DIST-IDEM-524–72 часа
TTL idempotency > 72 ч без причиныR-DIST-IDEM-524–72 часа + @Cron cleanup
Один слой защиты для moneyR-DIST-IDEM-4client key + UNIQUE constraint на БД
SELECT перед INSERT без транзакции (TOCTOU race)R-DIST-IDEM-2.orIgnore() — атомарный INSERT ... ON CONFLICT DO NOTHING
Дедупликация через NestJS EventBus (@nestjs/cqrs)R-DIST-IDEM-2in-memory, не персистентный — только DataSource.transaction с processed_event

Куда дальше

  • Сага — каждый шаг саги обязан быть идемпотентным.
  • Компенсация — compensation тоже идемпотентен, saga повторяет его при retry.
  • Outbox + Inbox — outbox решает producer-side гарантии; processed_event — это inbox для Kafka.
  • Eventual consistency — read-проекция строится на тех же идемпотентных consumer-ах.
  • Distributed transactions — почему 2PC не работает и как saga с idempotency его заменяет.
  • Когда нужны распределённые паттерны — idempotency нужна только при cross-service операции.