Опирается на правила:
R-DIST-IDEM-1…R-DIST-IDEM-5иR-DIST-IDEM-X1…R-DIST-IDEM-X3из Distributed Patterns Style Guide → раздел 3. Idempotency.
Важно знать
- Распределённая система всегда at-least-once — kafkajs rebalance, HTTP retry, сетевой тайм-аут повторяют доставку. Receiver обязан быть идемпотентным.
- Каждое cross-service сообщение несёт уникальный ID: Kafka —
eventIdUUID v7, HTTP money —Idempotency-Keyheader, шаг саги —sagaId + stepName.- Receiver хранит processed-events в таблице
processed_event; вставка через.orIgnore()— UNIQUE-дедупликация под race condition без явногоSELECT + INSERT.- HTTP-команды — таблица
idempotency_record (idempotency_key, command_hash, response): повтор с тем же ключом возвращает сохранённый ответ; тот же ключ + другое тело →409 Conflict.- Money — двойная защита: client
Idempotency-Key+ внутренний UNIQUE(payment_provider, external_payment_id)на уровне БД.- TTL 24–72 часа для idempotency-records: короче — реальный retry клиента не проходит дедупликацию; длиннее — таблица растёт без пользы.
- Producer тоже несёт ответственность: kafkajs
idempotent: trueпредотвращает дубли на стороне брокера; receiver-side dedup один не спасает, если producer публикует два разныхeventIdдля одной операции.- NestJS
@nestjs/cqrsEventBus — in-memory, не распределённый: дедупликация через него бессмысленна; нужнаDataSource.transactionсprocessed_event.
В распределённой системе нет «доставлено ровно один раз». Сеть теряет ACK, kafkajs повторяет при rebalance, HTTP-клиент retry-ит. Единственный способ выжить — каждый receiver проверяет, не обработал ли он это сообщение, и при повторе возвращает тот же результат.
Уникальный ID на каждое сообщение
R-DIST-IDEM-1: каждое cross-service сообщение обязано иметь уникальный ID.
| Транспорт | ID | Источник |
|---|---|---|
| Kafka event | eventId UUID v7 | producer генерирует |
| HTTP money command | Idempotency-Key header | client генерирует |
| Saga step | sagaId + stepName | orchestrator знает |
UUID v7 включает timestamp в первых 48 битах — монотонно растущие значения дают последовательные INSERT в PG (низкая фрагментация B-tree) и позволяют сортировать по времени создания без дополнительного поля.
import { randomUUID } from 'node:crypto';
// PREFER: eventId UUID v7 через uuidv7() из библиотеки uuidv7
// или fallback UUID v4 из node:crypto — главное генерировать ОДИН РАЗ на операцию
export class OrderCreatedEvent {
readonly eventId: string = randomUUID(); // UUID v4 — подходит для dedup
readonly sagaId: string;
readonly eventType = 'OrderCreated.v1';
readonly orderId: number;
readonly customerId: number;
readonly amount: string;
readonly occurredAt: Date = new Date();
constructor(sagaId: string, orderId: number, customerId: number, amount: string) {
this.sagaId = sagaId;
this.orderId = orderId;
this.customerId = customerId;
this.amount = amount;
}
}
processed_event для kafkajs consumer
R-DIST-IDEM-2: receiver хранит обработанные eventId в БД и проверяет перед обработкой в той же транзакции.
CREATE TABLE processed_event (
event_id uuid PRIMARY KEY,
consumer_name text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX ix_processed_event_processed_at ON processed_event(processed_at);
TypeORM-entity для таблицы:
@Entity('processed_event')
export class ProcessedEventEntity {
@PrimaryColumn({ name: 'event_id', type: 'uuid' })
eventId: string;
@Column({ name: 'consumer_name' })
consumerName: string;
@CreateDateColumn({ name: 'processed_at', type: 'timestamptz' })
processedAt: Date;
}
Consumer в NestJS — получает сообщение через kafkajs, вся обработка внутри DataSource.transaction:
@Injectable()
export class OrderCreatedConsumer implements OnModuleInit {
constructor(
private readonly dataSource: DataSource,
private readonly orderProjectionRepository: OrderProjectionRepository,
) {}
async onModuleInit() {
// kafkajs consumer подключается отдельно через @Module providers
}
async handleMessage(eventId: string, payload: OrderCreatedEvent): Promise<void> {
await this.dataSource.transaction(async (manager) => {
// PREFER: INSERT ... ON CONFLICT DO NOTHING — дедуп под race без SELECT
const result = await manager
.createQueryBuilder()
.insert()
.into(ProcessedEventEntity)
.values({ eventId, consumerName: 'order-projection' })
.orIgnore()
.execute();
if (result.raw.length === 0) {
return; // уже обработано — skip
}
await this.orderProjectionRepository.upsert(manager, toProjection(payload));
});
}
}
.orIgnore() транслируется в INSERT ... ON CONFLICT DO NOTHING — атомарно и безопасно под параллельными consumer-потоками. result.raw.length === 0 означает конфликт (дубль) → skip.
Idempotency-Key для HTTP-команд
R-DIST-IDEM-3: для HTTP-команд receiver хранит (idempotency_key, command_hash, response) тройку.
CREATE TABLE idempotency_record (
idempotency_key text PRIMARY KEY,
command_hash text NOT NULL,
response jsonb NOT NULL,
http_status int NOT NULL,
created_at timestamptz NOT NULL DEFAULT now()
);
В NestJS удобно вынести логику дедупликации в interceptor, чтобы не дублировать в каждом контроллере:
@Injectable()
export class IdempotencyInterceptor implements NestInterceptor {
constructor(private readonly idempotencyRepository: IdempotencyRepository) {}
async intercept(context: ExecutionContext, next: CallHandler): Promise<Observable<unknown>> {
const request = context.switchToHttp().getRequest<Request>();
const key = request.headers['idempotency-key'] as string | undefined;
if (!key) {
throw new BadRequestException('Idempotency-Key header required');
}
const commandHash = computeHash(request.body);
const existing = await this.idempotencyRepository.findByKey(key);
if (existing) {
if (existing.commandHash !== commandHash) {
throw new ConflictException(`Idempotency-Key already used with different payload`);
}
const response = context.switchToHttp().getResponse<Response>();
response.status(existing.httpStatus).json(existing.response);
return EMPTY;
}
return next.handle().pipe(
tap(async (responseBody) => {
const httpResponse = context.switchToHttp().getResponse<Response>();
await this.idempotencyRepository.save({
idempotencyKey: key,
commandHash,
response: responseBody,
httpStatus: httpResponse.statusCode,
});
}),
);
}
}
Применение на write-эндпоинтах:
@Controller('payments')
@UseInterceptors(IdempotencyInterceptor)
export class PaymentController {
constructor(private readonly dispatcher: CommandBus) {}
@Post()
@HttpCode(201)
async charge(@Body() dto: ChargeRequestDto): Promise<PaymentResponseDto> {
const result = await this.dispatcher.execute(new ChargePaymentCommand(dto));
return PaymentResponseDto.from(result);
}
}
Три случая обработки:
- Ключ не встречался — команда выполняется, результат сохраняется, ответ возвращается.
- Ключ встречался + то же тело — сохранённый response возвращается без повторного выполнения.
- Ключ встречался + другое тело —
409 Conflict. Клиент неправильно переиспользует ключ.
Двойная защита для money
R-DIST-IDEM-4: money-операции защищаются дважды: client Idempotency-Key + внутренний UNIQUE constraint в БД.
CREATE TABLE payment (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
payment_provider text NOT NULL,
external_payment_id text NOT NULL,
amount numeric(19,4) NOT NULL,
status text NOT NULL,
customer_id bigint NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
UNIQUE (payment_provider, external_payment_id)
);
TypeORM entity для платежа Сбера:
@Entity('payment')
export class PaymentEntity {
@PrimaryGeneratedColumn({ name: 'id', type: 'bigint' })
id: number;
@Column({ name: 'payment_provider' })
paymentProvider: string;
@Column({ name: 'external_payment_id' })
externalPaymentId: string;
@Column({ name: 'amount', type: 'numeric', precision: 19, scale: 4 })
amount: string;
@Column({ name: 'status' })
status: string;
@Column({ name: 'customer_id', type: 'bigint' })
customerId: number;
@CreateDateColumn({ name: 'created_at', type: 'timestamptz' })
createdAt: Date;
}
Если Idempotency-Key не помог (клиент использовал разные ключи при retry), UNIQUE (payment_provider, external_payment_id) ловит дубль на уровне БД:
@Injectable()
export class CreatePaymentHandler implements ICommandHandler<ChargePaymentCommand> {
constructor(
@InjectRepository(PaymentEntity)
private readonly paymentRepository: Repository<PaymentEntity>,
) {}
async execute(command: ChargePaymentCommand): Promise<PaymentResult> {
try {
const payment = this.paymentRepository.create({
paymentProvider: command.provider,
externalPaymentId: command.externalId,
amount: command.amount.toFixed(4),
status: 'PENDING',
customerId: command.customerId,
});
await this.paymentRepository.save(payment);
return PaymentResult.created(payment.id);
} catch (error) {
if (isUniqueViolation(error, 'payment_payment_provider_external_payment_id_key')) {
const existing = await this.paymentRepository.findOneByOrFail({
paymentProvider: command.provider,
externalPaymentId: command.externalId,
});
return PaymentResult.existing(existing.id);
}
throw error;
}
}
}
isUniqueViolation — утилита, проверяющая error.code === '23505' (PostgreSQL) и имя constraint.
TTL для idempotency-records
R-DIST-IDEM-5: idempotency-records хранятся 24–72 часа.
- Меньше 24 часов — реальный retry клиента (через час после сетевого сбоя) не проходит дедупликацию → списали дважды.
- Больше 72 часов — таблица растёт, нагрузка на autovacuum, размер индекса.
Cleanup через NestJS @Cron:
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { LessThan } from 'typeorm';
@Injectable()
export class IdempotencyCleanupService {
constructor(
@InjectRepository(IdempotencyRecordEntity)
private readonly idempotencyRepository: Repository<IdempotencyRecordEntity>,
) {}
@Cron(CronExpression.EVERY_DAY_AT_3AM)
async cleanup(): Promise<void> {
const cutoff = new Date(Date.now() - 72 * 60 * 60 * 1000);
await this.idempotencyRepository.delete({ createdAt: LessThan(cutoff) });
}
}
@nestjs/schedule подключается в ScheduleModule.forRoot() в корневом AppModule. Cleanup-job работает ночью — не добавляет overhead на каждую запись.
Producer-side: kafkajs idempotent producer
R-DIST-IDEM-X2: receiver-side dedup не спасает, если producer публикует два разных eventId для одной операции при partial failure.
const kafka = new Kafka({ brokers: ['kafka:9092'] });
const producer = kafka.producer({
idempotent: true, // R-KFK-PROD-1 — Kafka acks=all + sequence numbers
maxInFlightRequests: 5,
retry: { retries: Number.MAX_SAFE_INTEGER },
});
С idempotent: true kafkajs гарантирует exactly-once на уровне брокера: при retry одно и то же сообщение не дублируется — брокер отклоняет повторную запись по sequence number. eventId в payload при этом остаётся тем же.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Receiver без dedup для money / critical-команд | R-DIST-IDEM-X1 | processed_event + Idempotency-Key interceptor |
Только receiver-side dedup, producer без idempotent: true | R-DIST-IDEM-X2 | kafkajs idempotent: true + receiver dedup |
randomUUID() при каждом retry на клиенте | R-DIST-IDEM-X3 | один ключ генерируется на бизнес-операцию, переиспользуется |
| TTL idempotency < 24 ч | R-DIST-IDEM-5 | 24–72 часа |
| TTL idempotency > 72 ч без причины | R-DIST-IDEM-5 | 24–72 часа + @Cron cleanup |
| Один слой защиты для money | R-DIST-IDEM-4 | client key + UNIQUE constraint на БД |
SELECT перед INSERT без транзакции (TOCTOU race) | R-DIST-IDEM-2 | .orIgnore() — атомарный INSERT ... ON CONFLICT DO NOTHING |
Дедупликация через NestJS EventBus (@nestjs/cqrs) | R-DIST-IDEM-2 | in-memory, не персистентный — только DataSource.transaction с processed_event |
Куда дальше
- Сага — каждый шаг саги обязан быть идемпотентным.
- Компенсация — compensation тоже идемпотентен, saga повторяет его при retry.
- Outbox + Inbox — outbox решает producer-side гарантии;
processed_event— это inbox для Kafka. - Eventual consistency — read-проекция строится на тех же идемпотентных consumer-ах.
- Distributed transactions — почему 2PC не работает и как saga с idempotency его заменяет.
- Когда нужны распределённые паттерны — idempotency нужна только при cross-service операции.