Опирается на правила: R-DIST-SAGA-1R-DIST-SAGA-5 и R-DIST-SAGA-X1R-DIST-SAGA-X4 из Distributed Patterns Style Guide → раздел 2. Saga — оркестрация vs хореография.

Важно знать

  • Saga — серия локальных транзакций + compensation; не один большой транзакшен через 2PC/XA.
  • @nestjs/cqrs Sagas (RxJS + in-memory EventBus) — не распределённая saga: state не персистентен, событие живёт в одном процессе; для durable-оркестрации — отдельный @Injectable orchestrator с TypeORM-entity.
  • Orchestration (центральный OrderSagaOrchestrator) — для complex sagas 4+ шагов или с branching. Весь flow виден в одном классе.
  • Choreography (kafkajs-consumer без координатора) — для simple sagas 2-3 шагов без branching. Логика рассредоточена по сервисам.
  • Saga state хранится в TypeORM-entity (saga_<name> таблица) — recovery после рестарта, видимость in-flight саг и audit.
  • sagaId сквозной в каждом сообщении (Kafka header + поле payload) и HTTP-заголовке X-Saga-Id — единственный способ корректно трассировать сагу через сервисы.
  • Orchestrator — отдельный @Injectable, не часть use case handler-а; шаг саги = state-переход + команда в outbox в одной DataSource.transaction.

Saga — главный паттерн UCP для управления cross-service бизнес-операцией. Когда «создать заказ» охватывает три сервиса (Order, Payment, Inventory) — каждый со своей БД и своими транзакциями — saga собирает их в одну согласованную операцию через локальные транзакции и compensation при сбое.

Когда применять Saga

R-DIST-SAGA-1: Saga применяется когда выполнены все три условия:

  1. Операция охватывает 2+ сервиса.
  2. Каждый шаг должен быть transactional локально (commit в свой PG через DataSource.transaction).
  3. Нужна возможность compensation — отмены предыдущих шагов при сбое промежуточного.

Если третье условие отсутствует (можно дотолкать retry-ями без отката) — достаточно outbox + idempotent consumer, saga избыточна.

Orchestration — для complex sagas

R-DIST-SAGA-2: orchestration рекомендуется для саг 4+ шагов или с branching. Центральный OrderSagaOrchestrator — отдельный @Injectable, реагирует на события шагов через kafkajs-consumer, продвигает state в БД и публикует следующую команду через outbox — всё в одной DataSource.transaction.

// saga/order-saga.entity.ts
@Entity('saga_order_creation')
export class OrderSagaEntity {
  @PrimaryColumn('uuid')
  sagaId: string;

  @Column()
  status: 'IN_PROGRESS' | 'COMPLETED' | 'FAILED' | 'COMPENSATING';

  @Column()
  currentStep: string;

  @Column({ type: 'jsonb' })
  payload: Record<string, unknown>;

  @CreateDateColumn()
  startedAt: Date;

  @Column({ nullable: true })
  completedAt: Date | null;

  @Column({ nullable: true })
  lastError: string | null;
}
// saga/order-saga.orchestrator.ts
@Injectable()
export class OrderSagaOrchestrator {
  constructor(private readonly dataSource: DataSource) {}

  async start(command: CreateOrderCommand): Promise<void> {
    const sagaId = uuidv7();

    await this.dataSource.transaction(async (m) => {
      await m.insert(OrderSagaEntity, {
        sagaId,
        status: 'IN_PROGRESS',
        currentStep: 'CREATE_ORDER',
        payload: command,
      });
      await m.insert(OutboxEventEntity, toOutbox({
        sagaId,
        type: 'CreateOrderCommand.v1',
        payload: { orderId: command.orderId, customerId: command.customerId },
      }));
    });
  }

  async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
    await this.dataSource.transaction(async (m) => {
      await m.update(OrderSagaEntity, { sagaId: event.sagaId }, {
        currentStep: 'REQUEST_PAYMENT',
      });
      await m.insert(OutboxEventEntity, toOutbox({
        sagaId: event.sagaId,
        type: 'RequestPaymentCommand.v1',
        payload: { orderId: event.orderId, amount: event.amount },
      }));
    });
  }

  async onPaymentCharged(event: PaymentChargedEvent): Promise<void> {
    await this.dataSource.transaction(async (m) => {
      await m.update(OrderSagaEntity, { sagaId: event.sagaId }, {
        currentStep: 'RESERVE_INVENTORY',
      });
      await m.insert(OutboxEventEntity, toOutbox({
        sagaId: event.sagaId,
        type: 'ReserveInventoryCommand.v1',
        payload: { orderId: event.orderId, items: event.items },
      }));
    });
  }

  async onInventoryReserved(event: InventoryReservedEvent): Promise<void> {
    await this.dataSource.transaction(async (m) => {
      await m.update(OrderSagaEntity, { sagaId: event.sagaId }, {
        status: 'COMPLETED',
        currentStep: 'DONE',
        completedAt: new Date(),
      });
      await m.insert(OutboxEventEntity, toOutbox({
        sagaId: event.sagaId,
        type: 'ConfirmOrderCommand.v1',
        payload: { orderId: event.orderId },
      }));
    });
  }

  async onPaymentFailed(event: PaymentFailedEvent): Promise<void> {
    await this.dataSource.transaction(async (m) => {
      await m.update(OrderSagaEntity, { sagaId: event.sagaId }, {
        status: 'COMPENSATING',
        currentStep: 'CANCEL_ORDER',
        lastError: event.reason,
      });
      await m.insert(OutboxEventEntity, toOutbox({
        sagaId: event.sagaId,
        type: 'CancelOrderCommand.v1',
        payload: { orderId: event.orderId, reason: event.reason },
      }));
    });
  }
}
// saga/order-saga.consumer.ts
@Injectable()
export class OrderSagaConsumer implements OnModuleInit {
  constructor(
    private readonly orchestrator: OrderSagaOrchestrator,
    private readonly kafka: KafkaService,
  ) {}

  async onModuleInit(): Promise<void> {
    await this.kafka.subscribe(
      ['order.events', 'payment.events', 'inventory.events'],
      async (message) => {
        const event = JSON.parse(message.value.toString());
        switch (event.type) {
          case 'OrderCreated.v1':
            return this.orchestrator.onOrderCreated(event);
          case 'PaymentCharged.v1':
            return this.orchestrator.onPaymentCharged(event);
          case 'InventoryReserved.v1':
            return this.orchestrator.onInventoryReserved(event);
          case 'PaymentFailed.v1':
            return this.orchestrator.onPaymentFailed(event);
        }
      },
    );
  }
}

Каждый переход — DataSource.transaction: state-update + outbox-insert атомарно. Если процесс упал между шагами, при рестарте читаем все IN_PROGRESS саги из saga_order_creation и продолжаем.

Choreography — для simple sagas

R-DIST-SAGA-3: choreography — для 2-3 шагов без branching. Каждый сервис подписан на события и реагирует самостоятельно. Центрального координатора нет.

CustomerRegistered → sber-bonus-service credits welcome bonus → BonusGranted → customer-service confirms
                                                               ↘ BonusFailed → customer-service flags
// В bonus-service: consumer реагирует на событие из customer-service
@Injectable()
export class CustomerRegisteredConsumer implements OnModuleInit {
  constructor(private readonly dataSource: DataSource) {}

  async onModuleInit(): Promise<void> {
    await this.kafka.subscribe(['customer.events'], async (message) => {
      const event = JSON.parse(message.value.toString());
      if (event.type !== 'CustomerRegistered.v1') return;

      await this.dataSource.transaction(async (m) => {
        const alreadyProcessed = await m.findOneBy(ProcessedEventEntity, {
          eventId: event.eventId,
        });
        if (alreadyProcessed) return;

        await m.insert(BonusAccountEntity, {
          customerId: event.payload.customerId,
          sagaId: event.sagaId,
          balance: 500,
        });
        await m.insert(ProcessedEventEntity, { eventId: event.eventId });
        await m.insert(OutboxEventEntity, toOutbox({
          sagaId: event.sagaId,
          type: 'BonusGranted.v1',
          payload: { customerId: event.payload.customerId, amount: 500 },
        }));
      });
    });
  }
}
// В customer-service: consumer реагирует на ответ из bonus-service
@Injectable()
export class BonusGrantedConsumer implements OnModuleInit {
  constructor(private readonly dataSource: DataSource) {}

  async onModuleInit(): Promise<void> {
    await this.kafka.subscribe(['bonus.events'], async (message) => {
      const event = JSON.parse(message.value.toString());
      if (event.type !== 'BonusGranted.v1') return;

      await this.dataSource.transaction(async (m) => {
        const alreadyProcessed = await m.findOneBy(ProcessedEventEntity, {
          eventId: event.eventId,
        });
        if (alreadyProcessed) return;

        await m.update(CustomerEntity, { sagaId: event.sagaId }, {
          status: 'ACTIVE',
        });
        await m.insert(ProcessedEventEntity, { eventId: event.eventId });
      });
    });
  }
}
ПараметрOrchestrationChoreography
Шагов4+2-3
Branchingданет
Видимость flowодин @InjectableN consumer-ов
Где statesaga_<name>-таблица у orchestrator-ау каждого сервиса
Сложность реализациисредняянизкая на старте
Сложность отладкисредняявысокая при росте

Saga state в TypeORM

R-DIST-SAGA-4: state саги хранится в TypeORM-entity (saga_<name> таблица). Три критичных свойства:

// migration: создание таблицы состояния саги
export class CreateSagaOrderCreation1234567890123 implements MigrationInterface {
  async up(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.query(`
      CREATE TABLE saga_order_creation (
        saga_id      uuid PRIMARY KEY,
        status       text NOT NULL,
        current_step text NOT NULL,
        payload      jsonb NOT NULL,
        started_at   timestamptz NOT NULL DEFAULT now(),
        completed_at timestamptz,
        last_error   text
      );

      CREATE INDEX ix_saga_order_creation_active
        ON saga_order_creation (status)
        WHERE status IN ('IN_PROGRESS', 'COMPENSATING');
    `);
  }

  async down(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.query('DROP TABLE saga_order_creation');
  }
}
  1. Видимость. SELECT * FROM saga_order_creation WHERE status = 'IN_PROGRESS' — какие саги в процессе прямо сейчас. Без этого service operations слепые.
  2. Recovery. Если процесс упал на шаге 3, после рестарта читаем все IN_PROGRESS саги и продолжаем. Без state-таблицы in-flight саги теряются.
  3. Audit. История каждой саги остаётся в БД — кто, когда, какой шаг сломал, на чём compensation срабатывал.

Partial index WHERE status IN ('IN_PROGRESS', 'COMPENSATING') — 99% rows быстро переходят в COMPLETED, искать нужно только активные.

sagaId сквозной

R-DIST-SAGA-5: sagaId (UUID v7) проходит через каждое Kafka-сообщение, каждый HTTP-запрос между сервисами, каждое доменное событие.

// Kafka message: sagaId в header + payload
const message = {
  headers: { 'x-saga-id': sagaId, 'x-event-id': uuidv7() },
  value: JSON.stringify({
    sagaId,
    eventId: uuidv7(),
    type: 'OrderCreated.v1',
    payload: { orderId, customerId },
  }),
};

// HTTP между сервисами
const response = await this.httpService.axiosRef.post(
  `${PAYMENT_SERVICE_URL}/payments`,
  body,
  { headers: { 'X-Saga-Id': sagaId } },
);
// В таблицах сервисов — колонка saga_id с индексом
@Entity('orders')
export class OrderEntity {
  @PrimaryColumn('uuid')
  orderId: string;

  @Index()
  @Column('uuid')
  sagaId: string;

  @Column()
  status: string;
}

SELECT * FROM orders WHERE saga_id = $1 — что произошло в этой саге в данном сервисе. Связывает шаги для tracing, debugging и idempotency.

Отдельность orchestrator от use case handler

R-DIST-SAGA-X4: CreateOrderHandler не должен сам вызывать payment-service и inventory-service. Handler пишет локальный шаг → публикует событие в outbox → отдельный OrderSagaOrchestrator ведёт сагу.

// AVOID: saga встроена в handler use case-а
@Injectable()
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
  async execute(command: CreateOrderCommand): Promise<void> {
    await this.dataSource.transaction(async (m) => {
      const order = m.create(OrderEntity, { ...command, status: 'PROCESSING' });
      await m.save(order);

      // прямой HTTP внутри транзакции — связывает время транзакции с сетевым вызовом
      const payment = await this.paymentClient.charge(order.orderId, command.amount);
      await this.inventoryClient.reserve(order.orderId, command.items);
    });
  }
}

// PREFER: handler делает только локальный шаг, orchestrator — отдельно
@Injectable()
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
  async execute(command: CreateOrderCommand): Promise<void> {
    const sagaId = uuidv7();

    await this.dataSource.transaction(async (m) => {
      await m.insert(OrderEntity, {
        orderId: command.orderId,
        sagaId,
        customerId: command.customerId,
        status: 'PENDING',
      });
      await m.insert(OutboxEventEntity, toOutbox({
        sagaId,
        type: 'OrderStarted.v1',
        payload: { orderId: command.orderId, amount: command.amount, items: command.items },
      }));
    });

    await this.sagaOrchestrator.start({ sagaId, ...command });
  }
}

Что запрещено

АнтипаттернПравилоЧто взамен
2PC/XA вместо саги (Kafka не XA, Node-драйверы XA не поддерживают)R-DIST-SAGA-X1saga с локальными DataSource.transaction
Saga без compensation-командR-DIST-SAGA-X2каждый шаг имеет парный compensation
Saga state в Map<string, SagaState> или RxJS-стейтеR-DIST-SAGA-X3saga_<name>-entity в PG
@nestjs/cqrs Sagas (RxJS + in-memory EventBus) для распределённой сагиR-DIST-SAGA-X3отдельный @Injectable orchestrator с TypeORM
Saga смешана с use case в одном handler-еR-DIST-SAGA-X4отдельный orchestrator
HTTP-вызов внутри DataSource.transaction write-handler-аR-DIST-SAGA-X4outbox + событие → orchestrator
Choreography на 5+ шаговR-DIST-SAGA-2orchestration с центральным координатором

Куда дальше

  • Distributed Patterns → раздел 2. Saga — нормативные формулировки.
  • Compensation — semantic state-change, не DELETE; идемпотентность compensation.
  • Idempotency — каждый шаг саги обязан быть идемпотентным.
  • Outbox + Inbox — публикация шагов и событий саги.
  • Eventual consistency — read-your-writes для in-flight саги.
  • Distributed transactions — почему 2PC/XA не вариант в Node-экосистеме.
  • Когда нужны распределённые паттерны — критерии выбора между saga, modular monolith и локальной транзакцией.