Опирается на правила: R-DIST-TX-X1R-DIST-TX-X3 и R-DIST-TX-1R-DIST-TX-3 из Distributed Patterns Rules → раздел 7. Distributed transactions — что НЕ делать.

Важно знать

  • 2PC/XA в Python-стеке запрещён — Kafka не поддерживает XA, под нагрузкой prepare-lock блокирует все участвующие ресурсы, transaction coordinator — single point of failure.
  • Последовательный commit двух AsyncSession через разные engine — best-effort, не атомарность. Сбой между commit-ами оставляет inconsistency без recovery-плана (R-DIST-TX-X3).
  • Альтернатива 1: saga с UoW — каждый шаг это локальный async with session.begin() в одном PG, compensation на сбое (R-DIST-TX-1).
  • Альтернатива 2: outbox + idempotent consumerINSERT данных + INSERT в outbox в одной транзакции, relay публикует в Kafka (R-DIST-TX-2).
  • Альтернатива 3: modular monolith — несколько BC в одном процессе с одним PG; UoW работает, никаких distributed-паттернов (R-DIST-TX-3).
  • «Нам нужна immediate consistency между сервисами» — почти всегда либо неверная граница BC, либо реальная EC, которую надо задекларировать.
  • SQLAlchemy TwoPhaseTransaction (session.begin_twophase()) существует для XA-совместимых RDBMS, но не помогает с Kafka и не решает проблему coordinator-а.

В мире JEE двухфазный коммит казался элегантным: один @Transactional и все ресурсы атомарны. В Python-экосистеме с aiokafka, SQLAlchemy AsyncSession и Kubernetes этот подход разваливается по нескольким причинам, которые не связаны с качеством реализации — они архитектурные.

Почему 2PC/XA не работает в Python-стеке

Kafka не поддерживает XA

Kafka реализует собственную транзакционную семантику (producer transactional_id, read_committed isolation) — и она не совместима с XA-протоколом. Попытка включить Kafka в 2PC как XA-ресурс невозможна технически. Это же касается Redis, ClickHouse, S3 — весь современный data stack, с которым работает Python-сервис, за пределами старых XA-совместимых RDBMS.

Если операция затрагивает PG и Kafka одновременно — 2PC не вариант даже при большом желании.

prepare-lock не масштабируется

Two-phase commit держит блокировку на всех участвующих ресурсах между фазой PREPARE и фазой COMMIT. При четырёх шагах через распределённую транзакцию — четыре ресурса залочены на время всего round-trip (сетевые hop-ы, latency брокера, IO). Под нагрузкой это становится bottleneck: throughput падает, конкурирующие транзакции ждут в очереди, вероятность deadlock растёт.

Transaction coordinator — single point of failure

XA требует отдельного процесса, который ведёт состояние каждой distributed transaction. Если coordinator упал в момент «PREPARE отправлен, COMMIT не отправлен» — все участвующие ресурсы зависают в IN_DOUBT до его восстановления. В Kubernetes с rolling restart и network partition coordinator ломается регулярно.

SQLAlchemy TwoPhaseTransaction — не выход

SQLAlchemy предоставляет session.begin_twophase() для XA-совместимых RDBMS (PostgreSQL поддерживает PREPARE TRANSACTION). Это работает для двух PG-сессий при определённых условиях, но:

  • не решает проблему с Kafka (не XA);
  • требует отдельного recovery-процесса для IN_DOUBT транзакций;
  • плохо работает в async-контексте (не всегда потокобезопасен с AsyncSession);
  • PostgreSQL рекомендует не использовать PREPARE TRANSACTION в production без явного recovery-плана.
# НЕ ДЕЛАТЬ — выглядит атомарно, но требует coordinator + XA-совместимый ресурс
async with session_order.begin_twophase() as tx_order:
    async with session_payment.begin_twophase() as tx_payment:
        await session_order.execute(insert(Order).values(...))
        await session_payment.execute(insert(Payment).values(...))
        await tx_payment.prepare()   # PREPARE TRANSACTION в PG
        await tx_order.prepare()     # PREPARE TRANSACTION в PG
        await tx_payment.commit()
        await tx_order.commit()
        # Если здесь упало — обе транзакции IN_DOUBT, ручной recovery

Последовательный commit двух AsyncSession — best-effort

R-DIST-TX-X3: самый распространённый антипаттерн в Python — два engine, два AsyncSession, commit по очереди.

# ОПАСНО — выглядит как одна операция, но не атомарно
async def create_order_with_payment(cmd: CreateOrderCommand) -> None:
    async with order_session.begin():
        await order_session.execute(insert(Order).values(
            id=cmd.order_id,
            customer_id=cmd.customer_id,
        ))
    # order committed — откатить уже нельзя

    async with payment_session.begin():
        await payment_session.execute(insert(Payment).values(
            order_id=cmd.order_id,
            amount=cmd.amount,
        ))
    # если здесь исключение — order есть, payment нет

Что ломается: если второй begin() выбросил исключение (сеть, constraint, OOM) — первый commit уже произошёл. Заказ создан, платёж не создан. Нет встроенного способа откатить уже завершённую транзакцию.

Это не гипотетическая ситуация — сетевые сбои, transient PG-ошибки, исчерпание connection pool случаются на production.

Альтернативы — что использовать вместо 2PC

1. Saga с локальными UoW

R-DIST-TX-1: стандарт UCP для cross-service flow. Каждый шаг — локальный async with session.begin() в своём сервисе и своём PG. Orchestrator хранит saga state в БД, при сбое шага вызывает compensation.

# order-service: OrderSagaOrchestrator
class OrderSagaOrchestrator:
    def __init__(
        self,
        session_factory: async_sessionmaker[AsyncSession],
        outbox: OutboxRepository,
    ) -> None:
        self._session_factory = session_factory
        self._outbox = outbox

    async def start(self, cmd: CreateOrderCommand) -> uuid.UUID:
        saga_id = uuid.uuid4()
        async with self._session_factory() as session:
            async with session.begin():
                await session.execute(
                    insert(SagaState).values(
                        saga_id=saga_id,
                        saga_type="create_order",
                        status="STARTED",
                        current_step="RESERVE_INVENTORY",
                        payload=cmd.model_dump_json(),
                    )
                )
                await self._outbox.append(
                    session,
                    topic="inventory.commands",
                    key=str(cmd.order_id),
                    payload=ReserveInventoryCommand(
                        saga_id=saga_id,
                        order_id=cmd.order_id,
                        items=cmd.items,
                    ).model_dump_json(),
                )
        return saga_id

    async def on_inventory_reserved(self, event: InventoryReservedEvent) -> None:
        async with self._session_factory() as session:
            async with session.begin():
                await session.execute(
                    update(SagaState)
                    .where(SagaState.saga_id == event.saga_id)
                    .values(current_step="CHARGE_PAYMENT", status="IN_PROGRESS")
                )
                await self._outbox.append(
                    session,
                    topic="payment.commands",
                    key=str(event.order_id),
                    payload=ChargePaymentCommand(
                        saga_id=event.saga_id,
                        order_id=event.order_id,
                        amount=event.total_amount,
                    ).model_dump_json(),
                )

    async def on_payment_failed(self, event: PaymentFailedEvent) -> None:
        async with self._session_factory() as session:
            async with session.begin():
                await session.execute(
                    update(SagaState)
                    .where(SagaState.saga_id == event.saga_id)
                    .values(status="COMPENSATING")
                )
                await self._outbox.append(
                    session,
                    topic="inventory.commands",
                    key=str(event.order_id),
                    payload=ReleaseInventoryCommand(
                        saga_id=event.saga_id,
                        order_id=event.order_id,
                    ).model_dump_json(),
                )

Каждый session.begin() — одна PG-транзакция. Failure одного шага не делает предыдущие «висящими»: у саги есть state, orchestrator знает, какой шаг compensation вызывать.

2. Outbox + idempotent consumer

R-DIST-TX-2: для event-driven sync без явных rollback-ов. INSERT данных и INSERT в outbox — одна транзакция в одном PG. Relay читает outbox и публикует в Kafka.

# order-service: CreateOrderHandler
class CreateOrderHandler:
    def __init__(
        self,
        session_factory: async_sessionmaker[AsyncSession],
        outbox: OutboxRepository,
    ) -> None:
        self._session_factory = session_factory
        self._outbox = outbox

    async def handle(self, cmd: CreateOrderCommand) -> OrderId:
        async with self._session_factory() as session:
            async with session.begin():
                order_id = uuid.uuid4()
                await session.execute(
                    insert(Order).values(
                        id=order_id,
                        customer_id=cmd.customer_id,
                        status="PENDING",
                        total=cmd.total_amount,
                    )
                )
                # INSERT в outbox — та же транзакция, атомарно
                await self._outbox.append(
                    session,
                    topic="orders.events",
                    key=str(order_id),
                    payload=OrderCreatedEvent(
                        order_id=order_id,
                        customer_id=cmd.customer_id,
                        items=cmd.items,
                    ).model_dump_json(),
                )
        return OrderId(order_id)
# outbox-relay: читает outbox, публикует в Kafka
class OutboxRelay:
    async def run(self) -> None:
        while True:
            async with self._session_factory() as session:
                async with session.begin():
                    rows = await session.execute(
                        select(OutboxEvent)
                        .where(OutboxEvent.published_at.is_(None))
                        .order_by(OutboxEvent.created_at)
                        .limit(100)
                        .with_for_update(skip_locked=True)
                    )
                    events = rows.scalars().all()
                    for event in events:
                        await self._producer.send(
                            topic=event.topic,
                            key=event.key.encode(),
                            value=event.payload.encode(),
                        )
                        event.published_at = datetime.utcnow()
            await asyncio.sleep(0.5)

Если relay упал между публикацией и пометкой published_at — при следующем запуске отправит повторно. Receiver идемпотентен по event_id в processed_event.

3. Modular monolith

R-DIST-TX-3: для tight coupling — несколько BC в одном процессе, один PG, один AsyncSession на запрос.

# один сервис, один PG — saga не нужна
class CreateOrderHandler:
    def __init__(
        self,
        session_factory: async_sessionmaker[AsyncSession],
        inventory: InventoryService,   # в том же процессе
        payment: PaymentService,       # в том же процессе
    ) -> None:
        self._session_factory = session_factory
        self._inventory = inventory
        self._payment = payment

    async def handle(self, cmd: CreateOrderCommand) -> OrderId:
        async with self._session_factory() as session:
            async with session.begin():
                order_id = uuid.uuid4()
                await session.execute(
                    insert(Order).values(
                        id=order_id,
                        customer_id=cmd.customer_id,
                        status="PENDING",
                    )
                )
                # inventory и payment работают с той же session
                await self._inventory.reserve(session, order_id, cmd.items)
                await self._payment.charge(session, order_id, cmd.amount)
                # один commit — все три операции атомарны
        return OrderId(order_id)

Все три операции в одном session.begin() — ACID локально. Никаких саг, никакой distributed-сложности. Когда команда и бизнес-требования потребуют независимого масштабирования — разделяем на сервисы, появляется saga.

«Нужна immediate consistency между сервисами»

Это требование почти всегда означает одно из трёх:

  1. Неверная граница BC — две операции, требующие immediate consistency, скорее всего принадлежат одному Bounded Context. Решение — объединить в один сервис с локальным UoW.
  2. Реальная eventual consistency — не нужна атомарность, нужна быстрая EC (< 1 секунды) с декларацией в OpenAPI. Outbox + relay даёт такую задержку при нормальной нагрузке.
  3. Read-your-writes — клиенту важно сразу увидеть свой результат, но не нужна атомарность с другим сервисом. Решается чтением из write-side в endpoint или версионным токеном.

«Хочу 2PC» — почти всегда симптом, не требование.

Что запрещено

АнтипаттернПравилоЧто взамен
2PC / XA через session.begin_twophase() для cross-serviceR-DIST-TX-X1saga с локальными UoW
Последовательный commit двух AsyncSession / двух engineR-DIST-TX-X3один PG (modular monolith) или saga
PREPARE TRANSACTION в PG без recovery-процессаR-DIST-TX-X1outbox + idempotent consumer
producer.send() из handler без outbox, после session.commit()R-DIST-OBX-X1outbox в той же транзакции
Kafka в роли XA-ресурса в distributed transactionR-DIST-TX-X1transactional producer + outbox
«Try-Cancel-Confirm» вручную поверх двух сервисовR-DIST-TX-X1стандартизованная saga с compensation

Куда дальше

  • Distributed Patterns → раздел 7. Distributed transactions — нормативные формулировки правил.
  • Saga — главная альтернатива 2PC: orchestrator, UoW на каждом шаге, saga state в БД.
  • Outbox + Inbox — атомарный «commit + publish» через одну PG-транзакцию.
  • Eventual consistency — что делать с требованием «immediate» и как задекларировать EC в FastAPI.
  • Idempotency — processed_event, Idempotency-Key, двойная защита для money-операций.
  • Compensation — semantic rollback вместо DELETE, audit trail, DLQ при сбое compensation.
  • Когда нужны распределённые паттерны — modular monolith как альтернатива микросервисной сложности.