Опирается на правила:
R-KFK-OBX-1…R-KFK-OBX-4иR-KFK-OBX-X1…R-KFK-OBX-X3из Kafka Style Guide → раздел 3. Outbox publishing.
Важно знать
- Domain events публикуются через outbox-relay, не напрямую
producer.sendиз handler-а.- Запись в
outbox_eventидёт в той жеDataSource.transaction, что бизнес-write. Либо обе commit, либо обе rollback.- Outbox-relay — отдельный
@Injectableс@Interval(@nestjs/schedule), читает unpublished сFOR UPDATE SKIP LOCKED, публикует через kafkajs, проставляетpublished_at.- Topic name derives от
eventType/aggregateType:<service>.<aggregate-type>.<event-name>.- Relay в batch (10–50 events) — снижает overhead DB-poll и Kafka-roundtrip.
- Partial index
WHERE published_at IS NULLобязателен — без него full scan при каждом poll.- Подписка «after commit» без outbox — потеря событий при падении между commit и publish.
producer.sendиз той же транзакции с DB — нельзя: Kafka не XA, rollback БД не откатит публикацию.
Outbox publishing — фундаментальный паттерн UCP. Все domain events публикуются через него. Это даёт at-least-once доставку с атомарностью «commit DB + публикация» через локальную транзакцию в PostgreSQL. Теория — Distributed → outbox + inbox.
Entity и DDL
R-KFK-OBX-X3: схема с partial-индексом.
// src/outbox/outbox-event.entity.ts
@Entity('outbox_event')
export class OutboxEventEntity {
@PrimaryGeneratedColumn({ type: 'bigint' })
id: string;
@Column({ name: 'event_id', type: 'uuid', unique: true })
eventId: string;
@Column({ name: 'aggregate_type' })
aggregateType: string;
@Column({ name: 'aggregate_id' })
aggregateId: string;
@Column({ name: 'event_type' })
eventType: string;
@Column({ type: 'jsonb' })
payload: Record<string, unknown>;
@Column()
topic: string;
@Column({ name: 'partition_key' })
partitionKey: string;
@CreateDateColumn({ name: 'created_at', type: 'timestamptz' })
createdAt: Date;
@Column({ name: 'published_at', type: 'timestamptz', nullable: true })
publishedAt: Date | null;
}
CREATE TABLE outbox_event (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
event_id uuid NOT NULL UNIQUE,
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
event_type text NOT NULL,
payload jsonb NOT NULL,
topic text NOT NULL,
partition_key text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
published_at timestamptz
);
CREATE INDEX ix_outbox_event_unpublished
ON outbox_event(id)
WHERE published_at IS NULL;
WHERE published_at IS NULL — partial index покрывает только «тёплые» строки, которые ещё не опубликованы. Таблица растёт без bound, а relay сканирует только нужное.
Запись в outbox из handler
R-KFK-OBX-1: write-handler пишет в outbox_event в той же транзакции.
// src/order/application/confirm-order.handler.ts
@Injectable()
export class ConfirmOrderHandler {
constructor(private readonly dataSource: DataSource) {}
async handle(command: ConfirmOrderCommand): Promise<void> {
await this.dataSource.transaction(async (manager) => {
const orderRepo = manager.getRepository(OrderEntity);
const outboxRepo = manager.getRepository(OutboxEventEntity);
const order = await orderRepo.findOne({
where: { id: command.orderId },
lock: { mode: 'pessimistic_write' },
});
if (!order) throw new OrderNotFoundError(command.orderId);
order.status = 'CONFIRMED';
order.confirmedAt = new Date();
await orderRepo.save(order);
await outboxRepo.save(
outboxRepo.create({
eventId: uuidv7(),
aggregateType: 'Order',
aggregateId: order.id,
eventType: 'order.confirmed.v1',
payload: {
eventId: uuidv7(),
eventType: 'OrderConfirmed.v1',
occurredAt: new Date().toISOString(),
aggregateType: 'Order',
aggregateId: order.id,
customerId: order.customerId,
totalAmount: order.totalAmount,
},
topic: 'order-service.order.confirmed',
partitionKey: order.id,
}),
);
});
}
}
Атомарность гарантирует PostgreSQL: либо обе записи commit, либо обе rollback. Никакой XA с Kafka не нужен.
Outbox-relay
R-KFK-OBX-2: отдельный @Injectable с @Interval.
// src/outbox/outbox-relay.service.ts
@Injectable()
export class OutboxRelayService {
private readonly logger = new Logger(OutboxRelayService.name);
constructor(
private readonly dataSource: DataSource,
private readonly producer: Producer,
) {}
@Interval(500)
async publish(): Promise<void> {
await this.dataSource.transaction(async (manager) => {
const events = await manager
.getRepository(OutboxEventEntity)
.createQueryBuilder('e')
.setLock('pessimistic_write')
.setOnLocked('skip_locked')
.where('e.publishedAt IS NULL')
.orderBy('e.id', 'ASC')
.take(50)
.getMany();
if (!events.length) return;
const byTopic = Map.groupBy(events, (e) => e.topic);
for (const [topic, batch] of byTopic) {
await this.producer.send({
topic,
acks: -1,
messages: batch.map((e) => ({
key: e.partitionKey,
value: JSON.stringify(e.payload),
headers: { 'x-event-id': e.eventId, 'x-event-type': e.eventType },
})),
});
}
await manager.update(
OutboxEventEntity,
events.map((e) => e.id),
{ publishedAt: new Date() },
);
});
}
}
FOR UPDATE SKIP LOCKED (setLock('pessimistic_write').setOnLocked('skip_locked')) — несколько pod-ов relay могут работать параллельно, каждый берёт свою порцию без блокировок. Горизонтальное масштабирование без координации.
@Interval(500) даёт типичную задержку публикации ~500 мс. Для критичных flow — 100, для аналитики — 5000 вполне нормально.
Инициализация producer
R-KFK-PROD-1, R-KFK-OBX-2: producer создаётся идемпотентным и регистрируется как провайдер.
// src/kafka/kafka-producer.provider.ts
export const KafkaProducerProvider: FactoryProvider<Producer> = {
provide: KAFKA_PRODUCER,
useFactory: async (config: KafkaConfig): Promise<Producer> => {
const kafka = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
ssl: config.ssl,
});
const producer = kafka.producer({ idempotent: true, maxInFlightRequests: 5 });
await producer.connect();
return producer;
},
inject: [KafkaConfig],
};
// src/outbox/outbox.module.ts
@Module({
imports: [TypeOrmModule.forFeature([OutboxEventEntity]), ScheduleModule],
providers: [KafkaProducerProvider, OutboxRelayService],
})
export class OutboxModule {}
Topic naming
R-KFK-OBX-3: convention <service>.<aggregate-type>.<event-name>.
| Сервис | Topic |
|---|---|
| order-service | order-service.order.created |
| order-service | order-service.order.confirmed |
| order-service | order-service.order.cancelled |
| payment-service | payment-service.payment.charged |
| payment-service | payment-service.payment.refunded |
| product-service | product-service.product.published |
| customer-service | customer-service.customer.registered |
Альтернатива — один topic на aggregate:
product-service.product → { eventType: "ProductPublished.v1", ... }
→ { eventType: "ProductPriceChanged.v1", ... }
Удобно для consumer, которому нужны «все события по продукту»: один подписчик ловит всё, фильтрует по eventType. Цена — нельзя ack событие ProductPriceChanged, не коммитя весь топик.
Batch-обработка
R-KFK-OBX-4: relay читает 10–50 events за раз.
.take(50)
Почему не по одному:
- DB-poll overhead — каждый запрос ~1–2 мс даже с indexed scan.
- Kafka roundtrip —
producer.sendждёт ACK, ~5–20 мс. - При 100 events/s по одному — relay постоянно занят, задержка большая.
С batch 50 — relay поднимает 50 events одним запросом, шлёт через kafkajs, published_at ставит одним update. Throughput x10–20.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
producer.send из той же DataSource.transaction с DB-операцией | R-KFK-OBX-X1 | outbox в той же транзакции, relay публикует отдельно |
Подписка на «after commit» / queryRunner.release() + send | R-KFK-OBX-X2 | outbox-relay с @Interval |
Outbox без колонки published_at | R-KFK-OBX-X3 | published_at timestamptz nullable + partial index |
| Partial index отсутствует | R-KFK-OBX-X3 | WHERE published_at IS NULL — обязателен |
Relay без setLock('pessimistic_write').setOnLocked('skip_locked') | R-KFK-OBX-2 | SKIP LOCKED для параллельных pod-ов |
| Relay по одному событию | R-KFK-OBX-4 | batch .take(50) |
producer.send без acks: -1 | R-KFK-PROD-X2 | acks: -1 (all) обязателен |
Outbox без event_id UNIQUE | R-KFK-OBX-1 | UNIQUE constraint защищает от двойной записи |
Куда дальше
- Конфигурация — сборка
KafkaConfig, fail-fast на отсутствующий topic. - Producer — почему нельзя
producer.sendнапрямую из handler. - Idempotent consumer — receiver side at-least-once.
- Event design — формат payload, который пишется в
outbox_event. - Observability — метрики relay: lag, batch size, publish errors.
- Consumer — manual commit offset после успешной обработки.
- Retry topic + DLQ — что делать, если relay не может опубликовать.
- Security — TLS/SASL для kafkajs producer.