Опирается на правила:
R-DIST-SAGA-1…R-DIST-SAGA-5иR-DIST-SAGA-X1…R-DIST-SAGA-X4из Distributed Patterns Style Guide → раздел 2. Saga — оркестрация vs хореография.
Важно знать
- Saga — серия локальных транзакций + compensation; не один большой транзакшен через 2PC/XA.
@nestjs/cqrsSagas (RxJS + in-memory EventBus) — не распределённая saga: state не персистентен, событие живёт в одном процессе; для durable-оркестрации — отдельный@Injectableorchestrator с TypeORM-entity.- Orchestration (центральный
OrderSagaOrchestrator) — для complex sagas 4+ шагов или с branching. Весь flow виден в одном классе.- Choreography (kafkajs-consumer без координатора) — для simple sagas 2-3 шагов без branching. Логика рассредоточена по сервисам.
- Saga state хранится в TypeORM-entity (
saga_<name>таблица) — recovery после рестарта, видимость in-flight саг и audit.sagaIdсквозной в каждом сообщении (Kafka header + поле payload) и HTTP-заголовкеX-Saga-Id— единственный способ корректно трассировать сагу через сервисы.- Orchestrator — отдельный
@Injectable, не часть use case handler-а; шаг саги = state-переход + команда в outbox в однойDataSource.transaction.
Saga — главный паттерн UCP для управления cross-service бизнес-операцией. Когда «создать заказ» охватывает три сервиса (Order, Payment, Inventory) — каждый со своей БД и своими транзакциями — saga собирает их в одну согласованную операцию через локальные транзакции и compensation при сбое.
Когда применять Saga
R-DIST-SAGA-1: Saga применяется когда выполнены все три условия:
- Операция охватывает 2+ сервиса.
- Каждый шаг должен быть transactional локально (commit в свой PG через
DataSource.transaction). - Нужна возможность compensation — отмены предыдущих шагов при сбое промежуточного.
Если третье условие отсутствует (можно дотолкать retry-ями без отката) — достаточно outbox + idempotent consumer, saga избыточна.
Orchestration — для complex sagas
R-DIST-SAGA-2: orchestration рекомендуется для саг 4+ шагов или с branching. Центральный OrderSagaOrchestrator — отдельный @Injectable, реагирует на события шагов через kafkajs-consumer, продвигает state в БД и публикует следующую команду через outbox — всё в одной DataSource.transaction.
// saga/order-saga.entity.ts
@Entity('saga_order_creation')
export class OrderSagaEntity {
@PrimaryColumn('uuid')
sagaId: string;
@Column()
status: 'IN_PROGRESS' | 'COMPLETED' | 'FAILED' | 'COMPENSATING';
@Column()
currentStep: string;
@Column({ type: 'jsonb' })
payload: Record<string, unknown>;
@CreateDateColumn()
startedAt: Date;
@Column({ nullable: true })
completedAt: Date | null;
@Column({ nullable: true })
lastError: string | null;
}
// saga/order-saga.orchestrator.ts
@Injectable()
export class OrderSagaOrchestrator {
constructor(private readonly dataSource: DataSource) {}
async start(command: CreateOrderCommand): Promise<void> {
const sagaId = uuidv7();
await this.dataSource.transaction(async (m) => {
await m.insert(OrderSagaEntity, {
sagaId,
status: 'IN_PROGRESS',
currentStep: 'CREATE_ORDER',
payload: command,
});
await m.insert(OutboxEventEntity, toOutbox({
sagaId,
type: 'CreateOrderCommand.v1',
payload: { orderId: command.orderId, customerId: command.customerId },
}));
});
}
async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
await this.dataSource.transaction(async (m) => {
await m.update(OrderSagaEntity, { sagaId: event.sagaId }, {
currentStep: 'REQUEST_PAYMENT',
});
await m.insert(OutboxEventEntity, toOutbox({
sagaId: event.sagaId,
type: 'RequestPaymentCommand.v1',
payload: { orderId: event.orderId, amount: event.amount },
}));
});
}
async onPaymentCharged(event: PaymentChargedEvent): Promise<void> {
await this.dataSource.transaction(async (m) => {
await m.update(OrderSagaEntity, { sagaId: event.sagaId }, {
currentStep: 'RESERVE_INVENTORY',
});
await m.insert(OutboxEventEntity, toOutbox({
sagaId: event.sagaId,
type: 'ReserveInventoryCommand.v1',
payload: { orderId: event.orderId, items: event.items },
}));
});
}
async onInventoryReserved(event: InventoryReservedEvent): Promise<void> {
await this.dataSource.transaction(async (m) => {
await m.update(OrderSagaEntity, { sagaId: event.sagaId }, {
status: 'COMPLETED',
currentStep: 'DONE',
completedAt: new Date(),
});
await m.insert(OutboxEventEntity, toOutbox({
sagaId: event.sagaId,
type: 'ConfirmOrderCommand.v1',
payload: { orderId: event.orderId },
}));
});
}
async onPaymentFailed(event: PaymentFailedEvent): Promise<void> {
await this.dataSource.transaction(async (m) => {
await m.update(OrderSagaEntity, { sagaId: event.sagaId }, {
status: 'COMPENSATING',
currentStep: 'CANCEL_ORDER',
lastError: event.reason,
});
await m.insert(OutboxEventEntity, toOutbox({
sagaId: event.sagaId,
type: 'CancelOrderCommand.v1',
payload: { orderId: event.orderId, reason: event.reason },
}));
});
}
}
// saga/order-saga.consumer.ts
@Injectable()
export class OrderSagaConsumer implements OnModuleInit {
constructor(
private readonly orchestrator: OrderSagaOrchestrator,
private readonly kafka: KafkaService,
) {}
async onModuleInit(): Promise<void> {
await this.kafka.subscribe(
['order.events', 'payment.events', 'inventory.events'],
async (message) => {
const event = JSON.parse(message.value.toString());
switch (event.type) {
case 'OrderCreated.v1':
return this.orchestrator.onOrderCreated(event);
case 'PaymentCharged.v1':
return this.orchestrator.onPaymentCharged(event);
case 'InventoryReserved.v1':
return this.orchestrator.onInventoryReserved(event);
case 'PaymentFailed.v1':
return this.orchestrator.onPaymentFailed(event);
}
},
);
}
}
Каждый переход — DataSource.transaction: state-update + outbox-insert атомарно. Если процесс упал между шагами, при рестарте читаем все IN_PROGRESS саги из saga_order_creation и продолжаем.
Choreography — для simple sagas
R-DIST-SAGA-3: choreography — для 2-3 шагов без branching. Каждый сервис подписан на события и реагирует самостоятельно. Центрального координатора нет.
CustomerRegistered → sber-bonus-service credits welcome bonus → BonusGranted → customer-service confirms
↘ BonusFailed → customer-service flags
// В bonus-service: consumer реагирует на событие из customer-service
@Injectable()
export class CustomerRegisteredConsumer implements OnModuleInit {
constructor(private readonly dataSource: DataSource) {}
async onModuleInit(): Promise<void> {
await this.kafka.subscribe(['customer.events'], async (message) => {
const event = JSON.parse(message.value.toString());
if (event.type !== 'CustomerRegistered.v1') return;
await this.dataSource.transaction(async (m) => {
const alreadyProcessed = await m.findOneBy(ProcessedEventEntity, {
eventId: event.eventId,
});
if (alreadyProcessed) return;
await m.insert(BonusAccountEntity, {
customerId: event.payload.customerId,
sagaId: event.sagaId,
balance: 500,
});
await m.insert(ProcessedEventEntity, { eventId: event.eventId });
await m.insert(OutboxEventEntity, toOutbox({
sagaId: event.sagaId,
type: 'BonusGranted.v1',
payload: { customerId: event.payload.customerId, amount: 500 },
}));
});
});
}
}
// В customer-service: consumer реагирует на ответ из bonus-service
@Injectable()
export class BonusGrantedConsumer implements OnModuleInit {
constructor(private readonly dataSource: DataSource) {}
async onModuleInit(): Promise<void> {
await this.kafka.subscribe(['bonus.events'], async (message) => {
const event = JSON.parse(message.value.toString());
if (event.type !== 'BonusGranted.v1') return;
await this.dataSource.transaction(async (m) => {
const alreadyProcessed = await m.findOneBy(ProcessedEventEntity, {
eventId: event.eventId,
});
if (alreadyProcessed) return;
await m.update(CustomerEntity, { sagaId: event.sagaId }, {
status: 'ACTIVE',
});
await m.insert(ProcessedEventEntity, { eventId: event.eventId });
});
});
}
}
| Параметр | Orchestration | Choreography |
|---|---|---|
| Шагов | 4+ | 2-3 |
| Branching | да | нет |
| Видимость flow | один @Injectable | N consumer-ов |
| Где state | saga_<name>-таблица у orchestrator-а | у каждого сервиса |
| Сложность реализации | средняя | низкая на старте |
| Сложность отладки | средняя | высокая при росте |
Saga state в TypeORM
R-DIST-SAGA-4: state саги хранится в TypeORM-entity (saga_<name> таблица). Три критичных свойства:
// migration: создание таблицы состояния саги
export class CreateSagaOrderCreation1234567890123 implements MigrationInterface {
async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE saga_order_creation (
saga_id uuid PRIMARY KEY,
status text NOT NULL,
current_step text NOT NULL,
payload jsonb NOT NULL,
started_at timestamptz NOT NULL DEFAULT now(),
completed_at timestamptz,
last_error text
);
CREATE INDEX ix_saga_order_creation_active
ON saga_order_creation (status)
WHERE status IN ('IN_PROGRESS', 'COMPENSATING');
`);
}
async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query('DROP TABLE saga_order_creation');
}
}
- Видимость.
SELECT * FROM saga_order_creation WHERE status = 'IN_PROGRESS'— какие саги в процессе прямо сейчас. Без этого service operations слепые. - Recovery. Если процесс упал на шаге 3, после рестарта читаем все
IN_PROGRESSсаги и продолжаем. Без state-таблицы in-flight саги теряются. - Audit. История каждой саги остаётся в БД — кто, когда, какой шаг сломал, на чём compensation срабатывал.
Partial index WHERE status IN ('IN_PROGRESS', 'COMPENSATING') — 99% rows быстро переходят в COMPLETED, искать нужно только активные.
sagaId сквозной
R-DIST-SAGA-5: sagaId (UUID v7) проходит через каждое Kafka-сообщение, каждый HTTP-запрос между сервисами, каждое доменное событие.
// Kafka message: sagaId в header + payload
const message = {
headers: { 'x-saga-id': sagaId, 'x-event-id': uuidv7() },
value: JSON.stringify({
sagaId,
eventId: uuidv7(),
type: 'OrderCreated.v1',
payload: { orderId, customerId },
}),
};
// HTTP между сервисами
const response = await this.httpService.axiosRef.post(
`${PAYMENT_SERVICE_URL}/payments`,
body,
{ headers: { 'X-Saga-Id': sagaId } },
);
// В таблицах сервисов — колонка saga_id с индексом
@Entity('orders')
export class OrderEntity {
@PrimaryColumn('uuid')
orderId: string;
@Index()
@Column('uuid')
sagaId: string;
@Column()
status: string;
}
SELECT * FROM orders WHERE saga_id = $1 — что произошло в этой саге в данном сервисе. Связывает шаги для tracing, debugging и idempotency.
Отдельность orchestrator от use case handler
R-DIST-SAGA-X4: CreateOrderHandler не должен сам вызывать payment-service и inventory-service. Handler пишет локальный шаг → публикует событие в outbox → отдельный OrderSagaOrchestrator ведёт сагу.
// AVOID: saga встроена в handler use case-а
@Injectable()
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
async execute(command: CreateOrderCommand): Promise<void> {
await this.dataSource.transaction(async (m) => {
const order = m.create(OrderEntity, { ...command, status: 'PROCESSING' });
await m.save(order);
// прямой HTTP внутри транзакции — связывает время транзакции с сетевым вызовом
const payment = await this.paymentClient.charge(order.orderId, command.amount);
await this.inventoryClient.reserve(order.orderId, command.items);
});
}
}
// PREFER: handler делает только локальный шаг, orchestrator — отдельно
@Injectable()
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
async execute(command: CreateOrderCommand): Promise<void> {
const sagaId = uuidv7();
await this.dataSource.transaction(async (m) => {
await m.insert(OrderEntity, {
orderId: command.orderId,
sagaId,
customerId: command.customerId,
status: 'PENDING',
});
await m.insert(OutboxEventEntity, toOutbox({
sagaId,
type: 'OrderStarted.v1',
payload: { orderId: command.orderId, amount: command.amount, items: command.items },
}));
});
await this.sagaOrchestrator.start({ sagaId, ...command });
}
}
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| 2PC/XA вместо саги (Kafka не XA, Node-драйверы XA не поддерживают) | R-DIST-SAGA-X1 | saga с локальными DataSource.transaction |
| Saga без compensation-команд | R-DIST-SAGA-X2 | каждый шаг имеет парный compensation |
Saga state в Map<string, SagaState> или RxJS-стейте | R-DIST-SAGA-X3 | saga_<name>-entity в PG |
@nestjs/cqrs Sagas (RxJS + in-memory EventBus) для распределённой саги | R-DIST-SAGA-X3 | отдельный @Injectable orchestrator с TypeORM |
| Saga смешана с use case в одном handler-е | R-DIST-SAGA-X4 | отдельный orchestrator |
HTTP-вызов внутри DataSource.transaction write-handler-а | R-DIST-SAGA-X4 | outbox + событие → orchestrator |
| Choreography на 5+ шагов | R-DIST-SAGA-2 | orchestration с центральным координатором |
Куда дальше
- Distributed Patterns → раздел 2. Saga — нормативные формулировки.
- Compensation — semantic state-change, не DELETE; идемпотентность compensation.
- Idempotency — каждый шаг саги обязан быть идемпотентным.
- Outbox + Inbox — публикация шагов и событий саги.
- Eventual consistency — read-your-writes для in-flight саги.
- Distributed transactions — почему 2PC/XA не вариант в Node-экосистеме.
- Когда нужны распределённые паттерны — критерии выбора между saga, modular monolith и локальной транзакцией.