Опирается на правила:
R-DIST-SAGA-1…R-DIST-SAGA-5иR-DIST-SAGA-X1…R-DIST-SAGA-X4из Distributed Patterns Style Guide → раздел 2. Saga — оркестрация vs хореография.
Важно знать
- Saga — серия локальных UoW-транзакций (
AsyncSession.commit()) + compensation; не один распределённыйAsyncSessionчерез несколько баз.- Orchestration (центральный координатор-компонент) — для complex sagas 4+ шагов или с branching. Весь flow читается в одном классе.
- Choreography (события без координатора) — для simple sagas 2-3 шагов без branching. При росте шагов — заменить orchestration.
- Saga state хранится в БД (
saga_<name>таблица через SQLAlchemy) — recovery после рестарта, видимость in-flight саг, audit.saga_idUUID проходит через каждое Kafka-сообщение и каждый HTTP-заголовокX-Saga-Id.- Запрет: несколько
AsyncSessionс последовательнымиcommit()через разные БД — это не атомарность (R-DIST-TX-X3).- Orchestrator — отдельный Python-класс (
OrderSagaOrchestrator), неUseCase-handler (R-DIST-SAGA-X4).- Saga state не в памяти (
dict/ instance-переменная) — рестарт теряет все in-flight саги (R-DIST-SAGA-X3).
Saga — стандартный паттерн UCP для управления cross-service бизнес-операцией. Когда «создать заказ» означает три сервиса с тремя независимыми PostgreSQL, saga собирает их в согласованную бизнес-операцию через локальные транзакции и compensation при сбое.
Когда применять Saga
R-DIST-SAGA-1: saga применяется, когда выполнены все три условия:
- Операция охватывает 2+ сервиса.
- Каждый шаг транзакционен локально —
async with session.begin()в своём PG. - Нужна возможность compensation при сбое промежуточного шага.
Если compensation не нужна (достаточно retry через idempotent consumer) — saga избыточна, хватит outbox + idempotent consumer.
Orchestration — для complex sagas
R-DIST-SAGA-2: orchestration рекомендуется для саг 4+ шагов или с branching. Центральный координатор (OrderSagaOrchestrator) знает все шаги, условия переходов и compensation-цепочки.
# saga/order_saga_orchestrator.py
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from app.order.repository import OrderRepository
from app.payment.client import PaymentClient
from app.inventory.client import InventoryClient
from app.saga.repository import SagaStateRepository
class OrderSagaOrchestrator:
def __init__(
self,
session: AsyncSession,
order_repo: OrderRepository,
payment_client: PaymentClient,
inventory_client: InventoryClient,
saga_repo: SagaStateRepository,
) -> None:
self._session = session
self._order_repo = order_repo
self._payment_client = payment_client
self._inventory_client = inventory_client
self._saga_repo = saga_repo
async def run(self, command: CreateOrderCommand) -> None:
saga_id = uuid.uuid4()
await self._saga_repo.create(saga_id, "order_creation", command.model_dump())
order_id: uuid.UUID | None = None
payment_id: uuid.UUID | None = None
try:
await self._saga_repo.update_step(saga_id, "CREATE_ORDER")
order_id = await self._order_repo.create(saga_id, command)
await self._session.commit()
await self._saga_repo.update_step(saga_id, "CHARGE_PAYMENT")
payment_id = await self._payment_client.charge(saga_id, order_id, command.amount)
await self._saga_repo.update_step(saga_id, "RESERVE_INVENTORY")
await self._inventory_client.reserve(saga_id, order_id, command.items)
await self._saga_repo.update_step(saga_id, "CONFIRM_ORDER")
await self._order_repo.confirm(saga_id, order_id)
await self._session.commit()
await self._saga_repo.complete(saga_id)
except Exception as exc:
await self._compensate(saga_id, order_id, payment_id)
raise SagaFailedError(saga_id) from exc
async def _compensate(
self,
saga_id: uuid.UUID,
order_id: uuid.UUID | None,
payment_id: uuid.UUID | None,
) -> None:
await self._saga_repo.update_status(saga_id, "COMPENSATING")
if payment_id is not None:
await self._payment_client.refund(saga_id, payment_id)
if order_id is not None:
await self._order_repo.cancel(saga_id, order_id)
await self._session.commit()
await self._saga_repo.update_status(saga_id, "FAILED")
Каждый commit() — локальная транзакция своего сервиса. Между шагами нет открытой распределённой транзакции. Если orchestrator упал на шаге 3, recovery-фоновая задача читает IN_PROGRESS-саги из таблицы и продолжает.
Choreography — для simple sagas
R-DIST-SAGA-3: choreography — для 2-3 шагов без branching. Каждый сервис публикует события и реагирует на события других. Центральный координатор отсутствует.
order.created → payment-service charges → payment.charged → order-service confirms
↘ payment.failed → order-service cancels
Реализация через aiokafka-консьюмер в payment-service:
# payment/consumers/order_created_consumer.py
from aiokafka import AIOKafkaConsumer
from app.payment.use_cases.charge_payment import ChargePaymentUseCase
from app.outbox.publisher import OutboxPublisher
class OrderCreatedConsumer:
def __init__(
self,
consumer: AIOKafkaConsumer,
charge_use_case: ChargePaymentUseCase,
outbox: OutboxPublisher,
) -> None:
self._consumer = consumer
self._charge = charge_use_case
self._outbox = outbox
async def run(self) -> None:
async for msg in self._consumer:
event = OrderCreatedEvent.model_validate_json(msg.value)
try:
payment_id = await self._charge.execute(event.saga_id, event.order_id, event.amount)
await self._outbox.publish(PaymentChargedEvent(saga_id=event.saga_id, payment_id=payment_id))
except InsufficientFundsError:
await self._outbox.publish(PaymentFailedEvent(saga_id=event.saga_id, reason="insufficient_funds"))
И реакция в order-service:
# order/consumers/payment_result_consumer.py
class PaymentResultConsumer:
async def run(self) -> None:
async for msg in self._consumer:
if msg.key == b"payment.charged":
event = PaymentChargedEvent.model_validate_json(msg.value)
async with self._session.begin():
order = await self._order_repo.find_by_saga_id(event.saga_id)
order.confirm()
elif msg.key == b"payment.failed":
event = PaymentFailedEvent.model_validate_json(msg.value)
async with self._session.begin():
order = await self._order_repo.find_by_saga_id(event.saga_id)
order.cancel()
| Параметр | Orchestration | Choreography |
|---|---|---|
| Шагов | 4+ | 2-3 |
| Branching | да | нет |
| Видимость flow | один класс | N сервисов |
| Где state | у orchestrator-а | у каждого сервиса |
| Сложность реализации | средняя | низкая (на старте) |
| Сложность отладки | средняя | высокая при росте |
Saga state в PostgreSQL
R-DIST-SAGA-4: state саги хранится в БД (saga_<name> таблица). Это даёт recovery, видимость и audit.
# saga/models.py
from sqlalchemy import Column, String, DateTime, JSON
from sqlalchemy.dialects.postgresql import UUID as PGUUID
from app.db.base import Base
import uuid
class SagaOrderCreation(Base):
__tablename__ = "saga_order_creation"
saga_id = Column(PGUUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
status = Column(String, nullable=False) # IN_PROGRESS | COMPLETED | FAILED | COMPENSATING
current_step = Column(String, nullable=False)
payload = Column(JSON, nullable=False)
started_at = Column(DateTime(timezone=True), nullable=False)
completed_at = Column(DateTime(timezone=True))
last_error = Column(String)
CREATE INDEX ix_saga_order_creation_active
ON saga_order_creation (status)
WHERE status IN ('IN_PROGRESS', 'COMPENSATING');
Partial index по активным статусам — потому что 99% строк быстро становятся COMPLETED, искать нужно только активные.
Recovery фоновая задача (запускается при старте FastAPI через lifespan):
# saga/recovery.py
async def recover_in_flight_sagas(orchestrator: OrderSagaOrchestrator, saga_repo: SagaStateRepository) -> None:
in_progress = await saga_repo.find_by_status(["IN_PROGRESS", "COMPENSATING"])
for saga_state in in_progress:
await orchestrator.resume(saga_state)
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
await recover_in_flight_sagas(orchestrator, saga_repo)
yield
saga_id сквозной
R-DIST-SAGA-5: saga_id (UUID) проходит через каждое сообщение и каждый HTTP-запрос между сервисами.
# В aiokafka-сообщении — поле в Pydantic-модели
class OrderCreatedEvent(BaseModel):
saga_id: uuid.UUID
event_id: uuid.UUID
event_type: str = "OrderCreated.v1"
order_id: uuid.UUID
customer_id: uuid.UUID
amount: Decimal
# HTTP-заголовок при межсервисном вызове
import httpx
async def call_payment_service(saga_id: uuid.UUID, payload: dict) -> dict:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://payment-service/internal/payments",
json=payload,
headers={"X-Saga-Id": str(saga_id)},
)
response.raise_for_status()
return response.json()
В таблицах сервисов — колонка saga_id UUID с индексом:
ALTER TABLE orders ADD COLUMN saga_id UUID;
CREATE INDEX ix_orders_saga_id ON orders (saga_id);
Это даёт SELECT * FROM orders WHERE saga_id = $1 — что произошло в этой саге в конкретном сервисе.
Разделение handler и orchestrator
R-DIST-SAGA-X4: handler UseCase делает только локальный шаг и публикует событие через outbox. Orchestrator — отдельный компонент, который реагирует на это событие.
# ПЛОХО — сага встроена в handler use case-а
class CreateOrderHandler:
async def handle(self, command: CreateOrderCommand) -> Order:
async with self._session.begin():
order = Order.create(command)
self._session.add(order)
# HTTP-вызов вне транзакции — нет атомарности с созданием заказа
payment_id = await self._payment_client.charge(order.id, command.amount)
try:
await self._inventory_client.reserve(order.id, command.items)
except Exception:
await self._payment_client.refund(payment_id) # compensation в handler!
raise
return order
# ХОРОШО — handler делает локальный шаг, orchestrator реагирует отдельно
class CreateOrderHandler:
async def handle(self, command: CreateOrderCommand) -> Order:
async with self._session.begin():
order = Order.start(command)
self._session.add(order)
await self._outbox.publish(OrderStartedEvent(saga_id=order.saga_id, order_id=order.id))
return order
class OrderSagaOrchestrator:
async def on_order_started(self, event: OrderStartedEvent) -> None:
# реагирует на событие, ведёт следующие шаги
await self._saga_repo.update_step(event.saga_id, "CHARGE_PAYMENT")
payment_id = await self._payment_client.charge(event.saga_id, event.order_id, ...)
...
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
2PC/XA: несколько AsyncSession с последовательными commit() через разные PG | R-DIST-SAGA-X1, R-DIST-TX-X3 | saga с локальными UoW-транзакциями |
| Saga без compensation-команд | R-DIST-SAGA-X2 | каждый шаг имеет парную compensation |
Saga state в dict или instance-переменной orchestrator-а | R-DIST-SAGA-X3 | saga_<name> таблица в PG + recovery |
Saga смешана с UseCase в одном handle()-методе | R-DIST-SAGA-X4 | отдельный OrderSagaOrchestrator |
HTTP-вызов вне async with session.begin() без outbox | R-DIST-SAGA-X4 | outbox → событие → orchestrator |
| Choreography на 5+ шагов без координатора | R-DIST-SAGA-2 | orchestration с центральным классом |
producer.send() прямо из command-handler | R-DIST-OBX-X1 | outbox-запись в той же транзакции |
Куда дальше
- Compensation — semantic state-change, не DELETE; идемпотентность compensation в Python.
- Distributed transactions — почему цепочка
AsyncSession.commit()через разные БД не даёт атомарности. - Eventual consistency — read-your-writes для in-flight саги через FastAPI.
- Idempotency — каждый шаг саги обязан быть идемпотентным;
processed_event-таблица. - Outbox + Inbox — публикация шагов и событий саги через outbox в той же UoW.
- Когда нужны распределённые паттерны — alternatives check перед saga: объединить BC, modular monolith.