Опирается на правила: R-DIST-SAGA-1R-DIST-SAGA-5 и R-DIST-SAGA-X1R-DIST-SAGA-X4 из Distributed Patterns Style Guide → раздел 2. Saga — оркестрация vs хореография.

Важно знать

  • Saga — серия локальных UoW-транзакций (AsyncSession.commit()) + compensation; не один распределённый AsyncSession через несколько баз.
  • Orchestration (центральный координатор-компонент) — для complex sagas 4+ шагов или с branching. Весь flow читается в одном классе.
  • Choreography (события без координатора) — для simple sagas 2-3 шагов без branching. При росте шагов — заменить orchestration.
  • Saga state хранится в БД (saga_<name> таблица через SQLAlchemy) — recovery после рестарта, видимость in-flight саг, audit.
  • saga_id UUID проходит через каждое Kafka-сообщение и каждый HTTP-заголовок X-Saga-Id.
  • Запрет: несколько AsyncSession с последовательными commit() через разные БД — это не атомарность (R-DIST-TX-X3).
  • Orchestrator — отдельный Python-класс (OrderSagaOrchestrator), не UseCase-handler (R-DIST-SAGA-X4).
  • Saga state не в памяти (dict / instance-переменная) — рестарт теряет все in-flight саги (R-DIST-SAGA-X3).

Saga — стандартный паттерн UCP для управления cross-service бизнес-операцией. Когда «создать заказ» означает три сервиса с тремя независимыми PostgreSQL, saga собирает их в согласованную бизнес-операцию через локальные транзакции и compensation при сбое.

Когда применять Saga

R-DIST-SAGA-1: saga применяется, когда выполнены все три условия:

  1. Операция охватывает 2+ сервиса.
  2. Каждый шаг транзакционен локальноasync with session.begin() в своём PG.
  3. Нужна возможность compensation при сбое промежуточного шага.

Если compensation не нужна (достаточно retry через idempotent consumer) — saga избыточна, хватит outbox + idempotent consumer.

Orchestration — для complex sagas

R-DIST-SAGA-2: orchestration рекомендуется для саг 4+ шагов или с branching. Центральный координатор (OrderSagaOrchestrator) знает все шаги, условия переходов и compensation-цепочки.

# saga/order_saga_orchestrator.py
import uuid
from sqlalchemy.ext.asyncio import AsyncSession

from app.order.repository import OrderRepository
from app.payment.client import PaymentClient
from app.inventory.client import InventoryClient
from app.saga.repository import SagaStateRepository


class OrderSagaOrchestrator:
    def __init__(
        self,
        session: AsyncSession,
        order_repo: OrderRepository,
        payment_client: PaymentClient,
        inventory_client: InventoryClient,
        saga_repo: SagaStateRepository,
    ) -> None:
        self._session = session
        self._order_repo = order_repo
        self._payment_client = payment_client
        self._inventory_client = inventory_client
        self._saga_repo = saga_repo

    async def run(self, command: CreateOrderCommand) -> None:
        saga_id = uuid.uuid4()
        await self._saga_repo.create(saga_id, "order_creation", command.model_dump())

        order_id: uuid.UUID | None = None
        payment_id: uuid.UUID | None = None

        try:
            await self._saga_repo.update_step(saga_id, "CREATE_ORDER")
            order_id = await self._order_repo.create(saga_id, command)
            await self._session.commit()

            await self._saga_repo.update_step(saga_id, "CHARGE_PAYMENT")
            payment_id = await self._payment_client.charge(saga_id, order_id, command.amount)

            await self._saga_repo.update_step(saga_id, "RESERVE_INVENTORY")
            await self._inventory_client.reserve(saga_id, order_id, command.items)

            await self._saga_repo.update_step(saga_id, "CONFIRM_ORDER")
            await self._order_repo.confirm(saga_id, order_id)
            await self._session.commit()

            await self._saga_repo.complete(saga_id)
        except Exception as exc:
            await self._compensate(saga_id, order_id, payment_id)
            raise SagaFailedError(saga_id) from exc

    async def _compensate(
        self,
        saga_id: uuid.UUID,
        order_id: uuid.UUID | None,
        payment_id: uuid.UUID | None,
    ) -> None:
        await self._saga_repo.update_status(saga_id, "COMPENSATING")
        if payment_id is not None:
            await self._payment_client.refund(saga_id, payment_id)
        if order_id is not None:
            await self._order_repo.cancel(saga_id, order_id)
            await self._session.commit()
        await self._saga_repo.update_status(saga_id, "FAILED")

Каждый commit() — локальная транзакция своего сервиса. Между шагами нет открытой распределённой транзакции. Если orchestrator упал на шаге 3, recovery-фоновая задача читает IN_PROGRESS-саги из таблицы и продолжает.

Choreography — для simple sagas

R-DIST-SAGA-3: choreography — для 2-3 шагов без branching. Каждый сервис публикует события и реагирует на события других. Центральный координатор отсутствует.

order.created → payment-service charges → payment.charged → order-service confirms
                                        ↘ payment.failed  → order-service cancels

Реализация через aiokafka-консьюмер в payment-service:

# payment/consumers/order_created_consumer.py
from aiokafka import AIOKafkaConsumer
from app.payment.use_cases.charge_payment import ChargePaymentUseCase
from app.outbox.publisher import OutboxPublisher


class OrderCreatedConsumer:
    def __init__(
        self,
        consumer: AIOKafkaConsumer,
        charge_use_case: ChargePaymentUseCase,
        outbox: OutboxPublisher,
    ) -> None:
        self._consumer = consumer
        self._charge = charge_use_case
        self._outbox = outbox

    async def run(self) -> None:
        async for msg in self._consumer:
            event = OrderCreatedEvent.model_validate_json(msg.value)
            try:
                payment_id = await self._charge.execute(event.saga_id, event.order_id, event.amount)
                await self._outbox.publish(PaymentChargedEvent(saga_id=event.saga_id, payment_id=payment_id))
            except InsufficientFundsError:
                await self._outbox.publish(PaymentFailedEvent(saga_id=event.saga_id, reason="insufficient_funds"))

И реакция в order-service:

# order/consumers/payment_result_consumer.py
class PaymentResultConsumer:
    async def run(self) -> None:
        async for msg in self._consumer:
            if msg.key == b"payment.charged":
                event = PaymentChargedEvent.model_validate_json(msg.value)
                async with self._session.begin():
                    order = await self._order_repo.find_by_saga_id(event.saga_id)
                    order.confirm()
            elif msg.key == b"payment.failed":
                event = PaymentFailedEvent.model_validate_json(msg.value)
                async with self._session.begin():
                    order = await self._order_repo.find_by_saga_id(event.saga_id)
                    order.cancel()
ПараметрOrchestrationChoreography
Шагов4+2-3
Branchingданет
Видимость flowодин классN сервисов
Где stateу orchestrator-ау каждого сервиса
Сложность реализациисредняянизкая (на старте)
Сложность отладкисредняявысокая при росте

Saga state в PostgreSQL

R-DIST-SAGA-4: state саги хранится в БД (saga_<name> таблица). Это даёт recovery, видимость и audit.

# saga/models.py
from sqlalchemy import Column, String, DateTime, JSON
from sqlalchemy.dialects.postgresql import UUID as PGUUID
from app.db.base import Base
import uuid


class SagaOrderCreation(Base):
    __tablename__ = "saga_order_creation"

    saga_id = Column(PGUUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    status = Column(String, nullable=False)      # IN_PROGRESS | COMPLETED | FAILED | COMPENSATING
    current_step = Column(String, nullable=False)
    payload = Column(JSON, nullable=False)
    started_at = Column(DateTime(timezone=True), nullable=False)
    completed_at = Column(DateTime(timezone=True))
    last_error = Column(String)
CREATE INDEX ix_saga_order_creation_active
    ON saga_order_creation (status)
    WHERE status IN ('IN_PROGRESS', 'COMPENSATING');

Partial index по активным статусам — потому что 99% строк быстро становятся COMPLETED, искать нужно только активные.

Recovery фоновая задача (запускается при старте FastAPI через lifespan):

# saga/recovery.py
async def recover_in_flight_sagas(orchestrator: OrderSagaOrchestrator, saga_repo: SagaStateRepository) -> None:
    in_progress = await saga_repo.find_by_status(["IN_PROGRESS", "COMPENSATING"])
    for saga_state in in_progress:
        await orchestrator.resume(saga_state)
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    await recover_in_flight_sagas(orchestrator, saga_repo)
    yield

saga_id сквозной

R-DIST-SAGA-5: saga_id (UUID) проходит через каждое сообщение и каждый HTTP-запрос между сервисами.

# В aiokafka-сообщении — поле в Pydantic-модели
class OrderCreatedEvent(BaseModel):
    saga_id: uuid.UUID
    event_id: uuid.UUID
    event_type: str = "OrderCreated.v1"
    order_id: uuid.UUID
    customer_id: uuid.UUID
    amount: Decimal
# HTTP-заголовок при межсервисном вызове
import httpx

async def call_payment_service(saga_id: uuid.UUID, payload: dict) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://payment-service/internal/payments",
            json=payload,
            headers={"X-Saga-Id": str(saga_id)},
        )
        response.raise_for_status()
        return response.json()

В таблицах сервисов — колонка saga_id UUID с индексом:

ALTER TABLE orders ADD COLUMN saga_id UUID;
CREATE INDEX ix_orders_saga_id ON orders (saga_id);

Это даёт SELECT * FROM orders WHERE saga_id = $1 — что произошло в этой саге в конкретном сервисе.

Разделение handler и orchestrator

R-DIST-SAGA-X4: handler UseCase делает только локальный шаг и публикует событие через outbox. Orchestrator — отдельный компонент, который реагирует на это событие.

# ПЛОХО — сага встроена в handler use case-а
class CreateOrderHandler:
    async def handle(self, command: CreateOrderCommand) -> Order:
        async with self._session.begin():
            order = Order.create(command)
            self._session.add(order)
        # HTTP-вызов вне транзакции — нет атомарности с созданием заказа
        payment_id = await self._payment_client.charge(order.id, command.amount)
        try:
            await self._inventory_client.reserve(order.id, command.items)
        except Exception:
            await self._payment_client.refund(payment_id)    # compensation в handler!
            raise
        return order


# ХОРОШО — handler делает локальный шаг, orchestrator реагирует отдельно
class CreateOrderHandler:
    async def handle(self, command: CreateOrderCommand) -> Order:
        async with self._session.begin():
            order = Order.start(command)
            self._session.add(order)
            await self._outbox.publish(OrderStartedEvent(saga_id=order.saga_id, order_id=order.id))
        return order


class OrderSagaOrchestrator:
    async def on_order_started(self, event: OrderStartedEvent) -> None:
        # реагирует на событие, ведёт следующие шаги
        await self._saga_repo.update_step(event.saga_id, "CHARGE_PAYMENT")
        payment_id = await self._payment_client.charge(event.saga_id, event.order_id, ...)
        ...

Что запрещено

АнтипаттернПравилоЧто взамен
2PC/XA: несколько AsyncSession с последовательными commit() через разные PGR-DIST-SAGA-X1, R-DIST-TX-X3saga с локальными UoW-транзакциями
Saga без compensation-командR-DIST-SAGA-X2каждый шаг имеет парную compensation
Saga state в dict или instance-переменной orchestrator-аR-DIST-SAGA-X3saga_<name> таблица в PG + recovery
Saga смешана с UseCase в одном handle()-методеR-DIST-SAGA-X4отдельный OrderSagaOrchestrator
HTTP-вызов вне async with session.begin() без outboxR-DIST-SAGA-X4outbox → событие → orchestrator
Choreography на 5+ шагов без координатораR-DIST-SAGA-2orchestration с центральным классом
producer.send() прямо из command-handlerR-DIST-OBX-X1outbox-запись в той же транзакции

Куда дальше

  • Compensation — semantic state-change, не DELETE; идемпотентность compensation в Python.
  • Distributed transactions — почему цепочка AsyncSession.commit() через разные БД не даёт атомарности.
  • Eventual consistency — read-your-writes для in-flight саги через FastAPI.
  • Idempotency — каждый шаг саги обязан быть идемпотентным; processed_event-таблица.
  • Outbox + Inbox — публикация шагов и событий саги через outbox в той же UoW.
  • Когда нужны распределённые паттерны — alternatives check перед saga: объединить BC, modular monolith.