Опирается на правила:
R-DIST-TX-X1…R-DIST-TX-X3иR-DIST-TX-1…R-DIST-TX-3из Distributed Patterns Rules → раздел 7. Distributed transactions — что НЕ делать.
Важно знать
- 2PC/XA в Python-стеке запрещён — Kafka не поддерживает XA, под нагрузкой
prepare-lock блокирует все участвующие ресурсы, transaction coordinator — single point of failure.- Последовательный commit двух
AsyncSessionчерез разные engine — best-effort, не атомарность. Сбой между commit-ами оставляет inconsistency без recovery-плана (R-DIST-TX-X3).- Альтернатива 1: saga с UoW — каждый шаг это локальный
async with session.begin()в одном PG, compensation на сбое (R-DIST-TX-1).- Альтернатива 2: outbox + idempotent consumer —
INSERTданных +INSERTв outbox в одной транзакции, relay публикует в Kafka (R-DIST-TX-2).- Альтернатива 3: modular monolith — несколько BC в одном процессе с одним PG; UoW работает, никаких distributed-паттернов (
R-DIST-TX-3).- «Нам нужна immediate consistency между сервисами» — почти всегда либо неверная граница BC, либо реальная EC, которую надо задекларировать.
- SQLAlchemy
TwoPhaseTransaction(session.begin_twophase()) существует для XA-совместимых RDBMS, но не помогает с Kafka и не решает проблему coordinator-а.
В мире JEE двухфазный коммит казался элегантным: один @Transactional и все ресурсы атомарны. В Python-экосистеме с aiokafka, SQLAlchemy AsyncSession и Kubernetes этот подход разваливается по нескольким причинам, которые не связаны с качеством реализации — они архитектурные.
Почему 2PC/XA не работает в Python-стеке
Kafka не поддерживает XA
Kafka реализует собственную транзакционную семантику (producer transactional_id, read_committed isolation) — и она не совместима с XA-протоколом. Попытка включить Kafka в 2PC как XA-ресурс невозможна технически. Это же касается Redis, ClickHouse, S3 — весь современный data stack, с которым работает Python-сервис, за пределами старых XA-совместимых RDBMS.
Если операция затрагивает PG и Kafka одновременно — 2PC не вариант даже при большом желании.
prepare-lock не масштабируется
Two-phase commit держит блокировку на всех участвующих ресурсах между фазой PREPARE и фазой COMMIT. При четырёх шагах через распределённую транзакцию — четыре ресурса залочены на время всего round-trip (сетевые hop-ы, latency брокера, IO). Под нагрузкой это становится bottleneck: throughput падает, конкурирующие транзакции ждут в очереди, вероятность deadlock растёт.
Transaction coordinator — single point of failure
XA требует отдельного процесса, который ведёт состояние каждой distributed transaction. Если coordinator упал в момент «PREPARE отправлен, COMMIT не отправлен» — все участвующие ресурсы зависают в IN_DOUBT до его восстановления. В Kubernetes с rolling restart и network partition coordinator ломается регулярно.
SQLAlchemy TwoPhaseTransaction — не выход
SQLAlchemy предоставляет session.begin_twophase() для XA-совместимых RDBMS (PostgreSQL поддерживает PREPARE TRANSACTION). Это работает для двух PG-сессий при определённых условиях, но:
- не решает проблему с Kafka (не XA);
- требует отдельного recovery-процесса для
IN_DOUBTтранзакций; - плохо работает в async-контексте (не всегда потокобезопасен с
AsyncSession); - PostgreSQL рекомендует не использовать
PREPARE TRANSACTIONв production без явного recovery-плана.
# НЕ ДЕЛАТЬ — выглядит атомарно, но требует coordinator + XA-совместимый ресурс
async with session_order.begin_twophase() as tx_order:
async with session_payment.begin_twophase() as tx_payment:
await session_order.execute(insert(Order).values(...))
await session_payment.execute(insert(Payment).values(...))
await tx_payment.prepare() # PREPARE TRANSACTION в PG
await tx_order.prepare() # PREPARE TRANSACTION в PG
await tx_payment.commit()
await tx_order.commit()
# Если здесь упало — обе транзакции IN_DOUBT, ручной recovery
Последовательный commit двух AsyncSession — best-effort
R-DIST-TX-X3: самый распространённый антипаттерн в Python — два engine, два AsyncSession, commit по очереди.
# ОПАСНО — выглядит как одна операция, но не атомарно
async def create_order_with_payment(cmd: CreateOrderCommand) -> None:
async with order_session.begin():
await order_session.execute(insert(Order).values(
id=cmd.order_id,
customer_id=cmd.customer_id,
))
# order committed — откатить уже нельзя
async with payment_session.begin():
await payment_session.execute(insert(Payment).values(
order_id=cmd.order_id,
amount=cmd.amount,
))
# если здесь исключение — order есть, payment нет
Что ломается: если второй begin() выбросил исключение (сеть, constraint, OOM) — первый commit уже произошёл. Заказ создан, платёж не создан. Нет встроенного способа откатить уже завершённую транзакцию.
Это не гипотетическая ситуация — сетевые сбои, transient PG-ошибки, исчерпание connection pool случаются на production.
Альтернативы — что использовать вместо 2PC
1. Saga с локальными UoW
R-DIST-TX-1: стандарт UCP для cross-service flow. Каждый шаг — локальный async with session.begin() в своём сервисе и своём PG. Orchestrator хранит saga state в БД, при сбое шага вызывает compensation.
# order-service: OrderSagaOrchestrator
class OrderSagaOrchestrator:
def __init__(
self,
session_factory: async_sessionmaker[AsyncSession],
outbox: OutboxRepository,
) -> None:
self._session_factory = session_factory
self._outbox = outbox
async def start(self, cmd: CreateOrderCommand) -> uuid.UUID:
saga_id = uuid.uuid4()
async with self._session_factory() as session:
async with session.begin():
await session.execute(
insert(SagaState).values(
saga_id=saga_id,
saga_type="create_order",
status="STARTED",
current_step="RESERVE_INVENTORY",
payload=cmd.model_dump_json(),
)
)
await self._outbox.append(
session,
topic="inventory.commands",
key=str(cmd.order_id),
payload=ReserveInventoryCommand(
saga_id=saga_id,
order_id=cmd.order_id,
items=cmd.items,
).model_dump_json(),
)
return saga_id
async def on_inventory_reserved(self, event: InventoryReservedEvent) -> None:
async with self._session_factory() as session:
async with session.begin():
await session.execute(
update(SagaState)
.where(SagaState.saga_id == event.saga_id)
.values(current_step="CHARGE_PAYMENT", status="IN_PROGRESS")
)
await self._outbox.append(
session,
topic="payment.commands",
key=str(event.order_id),
payload=ChargePaymentCommand(
saga_id=event.saga_id,
order_id=event.order_id,
amount=event.total_amount,
).model_dump_json(),
)
async def on_payment_failed(self, event: PaymentFailedEvent) -> None:
async with self._session_factory() as session:
async with session.begin():
await session.execute(
update(SagaState)
.where(SagaState.saga_id == event.saga_id)
.values(status="COMPENSATING")
)
await self._outbox.append(
session,
topic="inventory.commands",
key=str(event.order_id),
payload=ReleaseInventoryCommand(
saga_id=event.saga_id,
order_id=event.order_id,
).model_dump_json(),
)
Каждый session.begin() — одна PG-транзакция. Failure одного шага не делает предыдущие «висящими»: у саги есть state, orchestrator знает, какой шаг compensation вызывать.
2. Outbox + idempotent consumer
R-DIST-TX-2: для event-driven sync без явных rollback-ов. INSERT данных и INSERT в outbox — одна транзакция в одном PG. Relay читает outbox и публикует в Kafka.
# order-service: CreateOrderHandler
class CreateOrderHandler:
def __init__(
self,
session_factory: async_sessionmaker[AsyncSession],
outbox: OutboxRepository,
) -> None:
self._session_factory = session_factory
self._outbox = outbox
async def handle(self, cmd: CreateOrderCommand) -> OrderId:
async with self._session_factory() as session:
async with session.begin():
order_id = uuid.uuid4()
await session.execute(
insert(Order).values(
id=order_id,
customer_id=cmd.customer_id,
status="PENDING",
total=cmd.total_amount,
)
)
# INSERT в outbox — та же транзакция, атомарно
await self._outbox.append(
session,
topic="orders.events",
key=str(order_id),
payload=OrderCreatedEvent(
order_id=order_id,
customer_id=cmd.customer_id,
items=cmd.items,
).model_dump_json(),
)
return OrderId(order_id)
# outbox-relay: читает outbox, публикует в Kafka
class OutboxRelay:
async def run(self) -> None:
while True:
async with self._session_factory() as session:
async with session.begin():
rows = await session.execute(
select(OutboxEvent)
.where(OutboxEvent.published_at.is_(None))
.order_by(OutboxEvent.created_at)
.limit(100)
.with_for_update(skip_locked=True)
)
events = rows.scalars().all()
for event in events:
await self._producer.send(
topic=event.topic,
key=event.key.encode(),
value=event.payload.encode(),
)
event.published_at = datetime.utcnow()
await asyncio.sleep(0.5)
Если relay упал между публикацией и пометкой published_at — при следующем запуске отправит повторно. Receiver идемпотентен по event_id в processed_event.
3. Modular monolith
R-DIST-TX-3: для tight coupling — несколько BC в одном процессе, один PG, один AsyncSession на запрос.
# один сервис, один PG — saga не нужна
class CreateOrderHandler:
def __init__(
self,
session_factory: async_sessionmaker[AsyncSession],
inventory: InventoryService, # в том же процессе
payment: PaymentService, # в том же процессе
) -> None:
self._session_factory = session_factory
self._inventory = inventory
self._payment = payment
async def handle(self, cmd: CreateOrderCommand) -> OrderId:
async with self._session_factory() as session:
async with session.begin():
order_id = uuid.uuid4()
await session.execute(
insert(Order).values(
id=order_id,
customer_id=cmd.customer_id,
status="PENDING",
)
)
# inventory и payment работают с той же session
await self._inventory.reserve(session, order_id, cmd.items)
await self._payment.charge(session, order_id, cmd.amount)
# один commit — все три операции атомарны
return OrderId(order_id)
Все три операции в одном session.begin() — ACID локально. Никаких саг, никакой distributed-сложности. Когда команда и бизнес-требования потребуют независимого масштабирования — разделяем на сервисы, появляется saga.
«Нужна immediate consistency между сервисами»
Это требование почти всегда означает одно из трёх:
- Неверная граница BC — две операции, требующие immediate consistency, скорее всего принадлежат одному Bounded Context. Решение — объединить в один сервис с локальным UoW.
- Реальная eventual consistency — не нужна атомарность, нужна быстрая EC (< 1 секунды) с декларацией в OpenAPI. Outbox + relay даёт такую задержку при нормальной нагрузке.
- Read-your-writes — клиенту важно сразу увидеть свой результат, но не нужна атомарность с другим сервисом. Решается чтением из write-side в endpoint или версионным токеном.
«Хочу 2PC» — почти всегда симптом, не требование.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
2PC / XA через session.begin_twophase() для cross-service | R-DIST-TX-X1 | saga с локальными UoW |
Последовательный commit двух AsyncSession / двух engine | R-DIST-TX-X3 | один PG (modular monolith) или saga |
PREPARE TRANSACTION в PG без recovery-процесса | R-DIST-TX-X1 | outbox + idempotent consumer |
producer.send() из handler без outbox, после session.commit() | R-DIST-OBX-X1 | outbox в той же транзакции |
| Kafka в роли XA-ресурса в distributed transaction | R-DIST-TX-X1 | transactional producer + outbox |
| «Try-Cancel-Confirm» вручную поверх двух сервисов | R-DIST-TX-X1 | стандартизованная saga с compensation |
Куда дальше
- Distributed Patterns → раздел 7. Distributed transactions — нормативные формулировки правил.
- Saga — главная альтернатива 2PC: orchestrator, UoW на каждом шаге, saga state в БД.
- Outbox + Inbox — атомарный «commit + publish» через одну PG-транзакцию.
- Eventual consistency — что делать с требованием «immediate» и как задекларировать EC в FastAPI.
- Idempotency —
processed_event,Idempotency-Key, двойная защита для money-операций. - Compensation — semantic rollback вместо DELETE, audit trail, DLQ при сбое compensation.
- Когда нужны распределённые паттерны — modular monolith как альтернатива микросервисной сложности.