Опирается на правила:
R-DIST-OBX-1…R-DIST-OBX-3иR-DIST-OBX-X1…R-DIST-OBX-X2из Distributed Patterns — раздел 5. Outbox + Inbox.
Важно знать
- Outbox решает «DB commit + message publish атомарно»: одна
DataSource.transactionпишет доменную запись иINSERTвoutbox_event.- NestJS не даёт декларативных транзакций
@Transactional— весь outbox-код внутри явногоdataSource.transaction(async (m) => { ... }).- Outbox-relay — отдельный
@InjectableсsetIntervalили@Cron: выбирает unpublished батч черезFOR UPDATE SKIP LOCKEDQueryBuilder, публикует через kafkajsproducer.send, обновляетpublished_atв той же транзакции.- После публикации строка остаётся в таблице (audit + rebuild); удалять через retention-job, не сразу.
- Inbox — обратная сторона: consumer пишет сообщение в
inbox_eventсprocessed = false, отдельный handler обрабатывает вDataSource.transaction. Используется только для critical-потоков (финансы, высокий burst).- Дефолт вместо inbox —
processed_eventdedup:INSERT ... ON CONFLICT DO NOTHING(TypeORM.orIgnore()) в той же транзакции, что и обработка события.- Single source of truth — PG сервиса. kafkajs-producer — транспорт, не источник правды.
- Запрет:
producer.send()из command-handler напрямую и TypeORM subscriberafterInsert/afterUpdate как замена outbox.
Outbox — фундаментальный паттерн UCP. Все события из write-handler-ов идут через outbox без исключений. Это даёт at-least-once гарантию доставки без двухфазного коммита.
Проблема «commit + publish»
Что хочется:
// src/order/handlers/create-order.handler.ts
@Injectable()
export class CreateOrderHandler {
async handle(command: CreateOrderCommand): Promise<Order> {
const order = await this.orderRepository.save(Order.create(command));
await this.producer.send({
topic: 'order.events',
messages: [{ key: order.id, value: JSON.stringify(new OrderCreatedEvent(order)) }],
});
return order;
}
}
Что не работает:
- DB commit прошёл,
producer.sendупал → событие потеряно, downstream не знает обOrder. producer.sendпрошёл, DB commit упал (constraint violation, deadlock) → событие в Kafka, заказа в PG нет → downstream обрабатывает фантом.- Network partition между этими двумя — гарантий никаких.
Distributed transaction между PG и Kafka не существует: kafkajs не поддерживает XA, TypeORM не координирует Kafka-producer. Outbox решает это через локальную DataSource.transaction.
Outbox pattern
R-DIST-OBX-1: outbox для исходящих событий обязателен.
Схема outbox-таблицы
CREATE TABLE outbox_event (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
event_id uuid NOT NULL UNIQUE,
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
event_type text NOT NULL,
payload jsonb NOT NULL,
topic text NOT NULL,
partition_key text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
published_at timestamptz
);
CREATE INDEX ix_outbox_event_unpublished ON outbox_event(id) WHERE published_at IS NULL;
Partial index WHERE published_at IS NULL — relay сканирует только unpublished строки. После публикации строка исчезает из «горячего» индекса, но остаётся в таблице для audit.
TypeORM entity
// src/outbox/outbox-event.entity.ts
@Entity('outbox_event')
export class OutboxEventEntity {
@PrimaryGeneratedColumn({ type: 'bigint' })
id: string;
@Column({ type: 'uuid', unique: true })
eventId: string;
@Column()
aggregateType: string;
@Column()
aggregateId: string;
@Column()
eventType: string;
@Column({ type: 'jsonb' })
payload: object;
@Column()
topic: string;
@Column()
partitionKey: string;
@CreateDateColumn({ type: 'timestamptz' })
createdAt: Date;
@Column({ type: 'timestamptz', nullable: true })
publishedAt: Date | null;
}
OutboxWriter
// src/outbox/outbox-writer.ts
@Injectable()
export class OutboxWriter {
write(
manager: EntityManager,
opts: { eventId?: string; aggregateType: string; aggregateId: string; eventType: string; payload: object; topic: string; partitionKey: string },
): Promise<InsertResult> {
return manager.getRepository(OutboxEventEntity).insert({
eventId: opts.eventId ?? randomUUID(),
aggregateType: opts.aggregateType,
aggregateId: opts.aggregateId,
eventType: opts.eventType,
payload: opts.payload,
topic: opts.topic,
partitionKey: opts.partitionKey,
publishedAt: null,
});
}
}
OutboxWriter принимает EntityManager (уже открытая транзакция) — не открывает новую. Вся ответственность за транзакцию на стороне вызывающего handler-а.
Write-handler
// src/order/handlers/create-order.handler.ts
@Injectable()
export class CreateOrderHandler {
constructor(
private readonly dataSource: DataSource,
private readonly outbox: OutboxWriter,
) {}
async handle(command: CreateOrderCommand): Promise<Order> {
return this.dataSource.transaction(async (m) => {
const order = m.getRepository(OrderEntity).create({
id: randomUUID(),
customerId: command.customerId,
amount: command.amount,
status: 'pending',
createdAt: new Date(),
});
await m.save(order);
await this.outbox.write(m, {
aggregateType: 'Order',
aggregateId: order.id,
eventType: 'OrderCreated',
payload: {
orderId: order.id,
customerId: order.customerId,
amount: order.amount,
},
topic: 'order.events',
partitionKey: order.id,
});
return order;
});
}
}
INSERT order и INSERT outbox_event — одна DataSource.transaction. Либо оба commit, либо оба откат. Нет состояния «заказ есть, событие потеряно».
Outbox-relay
// src/outbox/outbox-relay.service.ts
@Injectable()
export class OutboxRelayService implements OnModuleInit, OnModuleDestroy {
private timer: NodeJS.Timeout | null = null;
constructor(
private readonly dataSource: DataSource,
private readonly kafka: Kafka,
) {}
onModuleInit(): void {
this.timer = setInterval(() => this.publish().catch(noop), 500);
}
onModuleDestroy(): void {
if (this.timer) clearInterval(this.timer);
}
private async publish(): Promise<void> {
await this.dataSource.transaction(async (m) => {
const batch = await m
.getRepository(OutboxEventEntity)
.createQueryBuilder('e')
.where('e.publishedAt IS NULL')
.orderBy('e.id')
.limit(100)
.setLock('pessimistic_write')
.setOnLocked('skip_locked')
.getMany();
if (batch.length === 0) return;
const producer = this.kafka.producer({ idempotent: true });
await producer.connect();
try {
await producer.send({
topic: batch[0].topic,
messages: batch.map((row) => ({
key: row.partitionKey,
value: JSON.stringify(row.payload),
headers: { eventId: row.eventId, eventType: row.eventType },
})),
});
} finally {
await producer.disconnect();
}
await m.update(
OutboxEventEntity,
batch.map((r) => r.id),
{ publishedAt: new Date() },
);
});
}
}
setLock('pessimistic_write').setOnLocked('skip_locked') — TypeORM-обёртка над FOR UPDATE SKIP LOCKED. Несколько инстансов relay-я могут работать параллельно: каждый захватывает свою порцию, не блокируя других.
Если relay упал между producer.send и m.update — следующий цикл возьмёт те же строки и опубликует снова. At-least-once: именно поэтому consumer обязан быть идемпотентным.
Single source of truth
R-DIST-OBX-3: PG сервиса — единственный источник правды. kafkajs producer — транспорт.
Что это даёт:
- Потеря Kafka-данных (retention истёк, broker недоступен) —
outbox_eventпродолжает накапливать; после восстановления relay публикует пропущенное. - Rebuild read-projection — скрипт перечитывает
outbox_eventи публикует события заново; consumer обрабатывает идемпотентно. - Audit — полная история событий, которые сервис когда-либо порождал.
Inbox pattern
R-DIST-OBX-2: inbox — обратная сторона outbox. Consumer пишет полученное сообщение в inbox_event с processed = false, отдельный handler обрабатывает unprocessed-строки в DataSource.transaction.
CREATE TABLE inbox_event (
event_id uuid PRIMARY KEY,
topic text NOT NULL,
received_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
processed boolean NOT NULL DEFAULT false,
processed_at timestamptz
);
CREATE INDEX ix_inbox_event_unprocessed ON inbox_event(received_at) WHERE NOT processed;
// src/payment/inbox/payment-inbox-consumer.service.ts
@Injectable()
export class PaymentInboxConsumerService implements OnModuleInit {
constructor(
private readonly kafka: Kafka,
private readonly dataSource: DataSource,
) {}
async onModuleInit(): Promise<void> {
const consumer = this.kafka.consumer({ groupId: 'payment-inbox-consumer' });
await consumer.connect();
await consumer.subscribe({ topic: 'payment.events' });
await consumer.run({
eachMessage: async ({ message }) => {
const eventId = message.headers?.eventId?.toString() ?? randomUUID();
await this.dataSource
.getRepository(InboxEventEntity)
.createQueryBuilder()
.insert()
.values({ eventId, topic: 'payment.events', payload: JSON.parse(message.value!.toString()) })
.orIgnore()
.execute();
},
});
}
}
// src/payment/inbox/payment-inbox-processor.service.ts
@Injectable()
export class PaymentInboxProcessorService implements OnModuleInit, OnModuleDestroy {
private timer: NodeJS.Timeout | null = null;
constructor(
private readonly dataSource: DataSource,
private readonly handler: PaymentEventHandler,
) {}
onModuleInit(): void {
this.timer = setInterval(() => this.process().catch(noop), 200);
}
onModuleDestroy(): void {
if (this.timer) clearInterval(this.timer);
}
private async process(): Promise<void> {
const batch = await this.dataSource
.getRepository(InboxEventEntity)
.createQueryBuilder('e')
.where('e.processed = false')
.orderBy('e.receivedAt')
.limit(50)
.setLock('pessimistic_write')
.setOnLocked('skip_locked')
.getMany();
for (const row of batch) {
await this.dataSource.transaction(async (m) => {
await this.handler.handle(m, row.payload);
await m.update(InboxEventEntity, { eventId: row.eventId }, {
processed: true,
processedAt: new Date(),
});
});
}
}
}
Когда использовать inbox
- Высокий burst — Kafka даёт резкий пик 10k msg/s, обработка тяжёлая (5 ms × 10k = 50 секунд). Inbox развязывает приём и обработку: consumer принимает быстро, processor работает в своём темпе.
- Critical financial flows — нужно гарантировать, что ни одно сообщение не потеряется между приёмом и обработкой даже при крэше.
В большинстве случаев inbox избыточен. Достаточно processed_event dedup с .orIgnore() в той же транзакции, что и доменная обработка:
// src/product/handlers/product-reserved.handler.ts
@Injectable()
export class ProductReservedHandler {
constructor(private readonly dataSource: DataSource) {}
async handle(event: ProductReservedEvent): Promise<void> {
await this.dataSource.transaction(async (m) => {
const inserted = await m
.getRepository(ProcessedEventEntity)
.createQueryBuilder()
.insert()
.values({ eventId: event.eventId, processedAt: new Date() })
.orIgnore()
.execute();
if (inserted.raw.affectedRows === 0) {
return;
}
await m
.createQueryBuilder()
.update(ProductEntity)
.set({ reservedQuantity: () => 'reserved_quantity + :qty', updatedAt: new Date() })
.where('id = :id', { id: event.productId })
.setParameter('qty', event.quantity)
.execute();
});
}
}
.orIgnore() — TypeORM-обёртка над INSERT ... ON CONFLICT DO NOTHING. UNIQUE-constraint на event_id в processed_event гарантирует деdup под race: если два consumer-а получили одно сообщение, только один запишет запись; второй получит affectedRows === 0 и выйдет.
| Критерий | Только processed_event | Inbox + processor |
|---|---|---|
| Сложность | низкая | средняя |
| Burst handling | ограничен concurrency consumer-а | развязка приёма и обработки |
| Recovery после крэша | re-consume из Kafka | отдельная обработка inbox |
| Когда применять | дефолт | financial / high burst |
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
producer.send() напрямую из command-handler | R-DIST-OBX-X1 | OutboxWriter.write(m, ...) в той же DataSource.transaction |
TypeORM subscriber afterInsert / afterUpdate для отправки в Kafka | R-DIST-OBX-X2 | outbox-relay с FOR UPDATE SKIP LOCKED |
Relay без setOnLocked('skip_locked') | R-DIST-OBX-1 | .setLock('pessimistic_write').setOnLocked('skip_locked') при нескольких инстансах |
| Outbox без partial-index по unpublished | R-DIST-OBX-1 | WHERE published_at IS NULL |
| Kafka как source of truth | R-DIST-OBX-3 | PG — SoT, kafkajs — транспорт |
| Inbox для каждого consumer-а по умолчанию | R-DIST-OBX-2 | processed_event dedup через .orIgnore() — дефолт; inbox только для financial / high burst |
DELETE published строк из outbox сразу после relay | R-DIST-OBX-3 | хранить для audit/rebuild, чистить retention-job-ом по расписанию |
Куда дальше
- Idempotency — receiver обязан быть идемпотентным при at-least-once;
processed_event+.orIgnore(). - Saga — оркестрация vs хореография — saga-state-таблица и outbox работают вместе; команды orchestrator-а публикуются через outbox.
- Eventual consistency — outbox — главный механизм EC; bounded-staleness SLO.
- Compensation — compensation-команды публикуются через outbox в той же
DataSource.transaction. - Distributed transactions — что не делать — почему multi-DataSource-commit-цепочка не замена outbox.
- Когда нужны распределённые паттерны — outbox актуален только при cross-service событиях.