Опирается на правила: R-SQLA-SESS-1R-SQLA-SESS-4 и R-SQLA-SESS-X1R-SQLA-SESS-X3 из SQLAlchemy Style Guide → раздел 4. Сессия и транзакции.

Важно знать

  • Граница транзакции — на Handler через Unit of Work (async with uow: ... await uow.commit()), не в репозитории и не в функции запроса.
  • AsyncSession per-request: создаётся через Depends/контейнер, не хранится как атрибут модуля или глобальная переменная.
  • expire_on_commit=True (дефолт SQLAlchemy) + обращение к ORM-объекту после commit в async = MissingGreenlet. Маппи в доменный объект до commit.
  • structlog.contextvars.bind_contextvars(step=..., use_case=..., aggregate_id=...) вызывается до шагов — исключение из UoW залогирует edge-handler с полным контекстом.
  • Исключение не глотать: транзакция откатывается автоматически через session.begin(), исключение всплывает на edge-handler для маппинга в problem+json.
  • Read-методы работают без commit; запись — только через UoW-команду.
  • Несколько репозиториев внутри одного Handler'а получают одну сессию — все операции в одной транзакции.

Транзакция — это граница атомарного куска работы с базой. В UCP бизнес-операция = use case (ConfirmOrder, ProcessPayment, RegisterCustomer). Use case реализуется одним Handler'ом. Поэтому транзакция = Handler. Это снимает вопрос «где граница» и даёт единое правило: если нужна транзакция — она открывается в Handler через Unit of Work.

В Python нет аннотаций вроде @Transactional — транзакция открывается явно через session.begin() или через UoW-контекстный менеджер. Это делает границу видимой: читая Handler, сразу понятно, где начинается и заканчивается атомарный кусок работы.

Unit of Work на Handler'е

R-SQLA-SESS-1: граница транзакции — на Handler. UoW оборачивает сессию и предоставляет commit(); Handler не вызывает session.commit() напрямую.

# core/order/handler/confirm_order_handler.py

class ConfirmOrderHandler:
    def __init__(
        self,
        session_factory: AsyncSessionFactory,
        orders: OrderRepository,
        outbox: OutboxRepository,
    ) -> None:
        self._session_factory = session_factory
        self._orders = orders
        self._outbox = outbox

    async def handle(self, cmd: ConfirmOrder) -> OrderId:
        structlog.contextvars.bind_contextvars(
            use_case="ConfirmOrder",
            aggregate_id=str(cmd.order_id),
        )
        async with self._session_factory() as session, session.begin():
            structlog.contextvars.bind_contextvars(step="load")
            order = await self._orders.get(session, cmd.order_id)

            structlog.contextvars.bind_contextvars(step="confirm")
            order.confirm()

            structlog.contextvars.bind_contextvars(step="save")
            await self._orders.save(session, order)

            structlog.contextvars.bind_contextvars(step="outbox")
            self._outbox.add(session, order.pull_events())

            return order.id

async with session.begin() — контекстный менеджер, который открывает транзакцию и при выходе без исключения делает commit, при исключении — rollback. session_factory — это async_sessionmaker, проброшенный через DI.

Что это даёт:

  • Чёткая граница. Весь Handler — одна транзакция. Order и outbox-запись либо сохраняются вместе, либо не сохраняются вовсе.
  • Нет частичных эффектов. Если на шаге outbox упало исключение — order.save откатится автоматически. Нет состояния «заказ подтверждён, событие не записано».
  • Контекст в логах. bind_contextvars перед шагом означает, что исключение, всплывающее из session.begin(), несёт информацию о том, на каком шаге произошёл сбой.

AsyncSession per-request

R-SQLA-SESS-2: сессия создаётся per-request. Не глобально, не на уровне модуля.

# adapters/in/api/deps.py

from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

AsyncSessionFactory = async_sessionmaker[AsyncSession]

async def get_session(
    session_factory: Annotated[AsyncSessionFactory, Depends(get_session_factory)],
) -> AsyncGenerator[AsyncSession, None]:
    async with session_factory() as session:
        yield session
# adapters/in/api/order_router.py

@router.post("/orders/{order_id}/confirm", status_code=200)
async def confirm_order(
    order_id: UUID,
    session: Annotated[AsyncSession, Depends(get_session)],
    handler: Annotated[ConfirmOrderHandler, Depends(get_confirm_order_handler)],
) -> OrderConfirmedResponse:
    result = await handler.handle(ConfirmOrder(order_id=order_id))
    return OrderConfirmedResponse(order_id=result)

async_sessionmaker настраивается один раз при старте приложения и хранится как синглтон. Каждый запрос получает свою сессию — изолированную, с собственным connection из пула. Когда запрос завершён (нормально или с исключением), async with session_factory() закрывает сессию и возвращает connection в пул.

Глобальная сессия или сессия на уровне модуля — типичная ошибка при переносе кода из синхронного SQLAlchemy. В async-окружении это приведёт к гонке состояний: два запроса будут делить одну сессию, транзакции перепутаются.

Read-only запросы — без commit

R-SQLA-SESS-3: read-методы работают без commit. Репозиторий для чтения не входит в UoW-команду.

# core/order/handler/get_order_handler.py

class GetOrderHandler:
    def __init__(
        self,
        session_factory: AsyncSessionFactory,
        orders: OrderViewRepository,
    ) -> None:
        self._session_factory = session_factory
        self._orders = orders

    async def handle(self, query: GetOrder) -> OrderView:
        async with self._session_factory() as session:
            return await self._orders.find_by_id(session, query.order_id)

Read-handler не открывает явную транзакцию: один запрос не требует изоляции от других читателей и не должен держать соединение под транзакцией дольше нужного. Если нужен консистентный снимок (читаем несколько таблиц, важна единая точка видимости данных), открываем session.begin() явно:

async with self._session_factory() as session:
    async with session.begin():
        order = await self._orders.find_by_id(session, query.order_id)
        customer = await self._customers.find_by_id(session, order.customer_id)

Пример: страница заказа в Сбере — агрегат Order + профиль Customer для отображения. Без транзакции два запроса могут увидеть разное состояние Customer, если между ними прошло обновление.

Контекст шага в транзакции

R-SQLA-SESS-4: перед каждым шагом внутри session.begin() привязываем шаг к structlog.contextvars.

async def handle(self, cmd: ProcessPayment) -> PaymentId:
    structlog.contextvars.bind_contextvars(
        use_case="ProcessPayment",
        aggregate_id=str(cmd.payment_id),
    )
    async with self._session_factory() as session, session.begin():
        structlog.contextvars.bind_contextvars(step="load_order")
        order = await self._orders.get(session, cmd.order_id)

        structlog.contextvars.bind_contextvars(step="load_product")
        product = await self._products.get(session, order.product_id)

        structlog.contextvars.bind_contextvars(step="apply_payment")
        order.apply_payment(cmd.amount, product)

        structlog.contextvars.bind_contextvars(step="save_order")
        await self._orders.save(session, order)

        structlog.contextvars.bind_contextvars(step="write_outbox")
        self._outbox.add(session, order.pull_events())

        return PaymentId(cmd.payment_id)

Почему контекст привязывается до шага, а не в except:

  • Исключение из session.begin() автоматически откатывает транзакцию и всплывает наверх.
  • bind_contextvars — это thread-local / contextvars запись: она живёт весь request-цикл.
  • Централизованный edge-handler (FastAPI exception handler) логирует исключение с полным контекстом: step, use_case, aggregate_id, exc_info.
  • Не нужен try/except в Handler ради лога — это как раз R-SQLA-SESS-X3.

Для многошаговых саг с компенсацией допустим явный log.exception("step_failed", step=...) перед шагом компенсации — но только с re-raise или переводом в терминальное состояние агрегата.

Несколько репозиториев — одна сессия

Типовой сценарий: оформление заказа резервирует остатки Product и создаёт Order атомарно.

# core/order/handler/place_order_handler.py

class PlaceOrderHandler:
    def __init__(
        self,
        session_factory: AsyncSessionFactory,
        orders: OrderRepository,
        products: ProductRepository,
        outbox: OutboxRepository,
    ) -> None:
        self._session_factory = session_factory
        self._orders = orders
        self._products = products
        self._outbox = outbox

    async def handle(self, cmd: PlaceOrder) -> OrderId:
        structlog.contextvars.bind_contextvars(
            use_case="PlaceOrder",
            aggregate_id=str(cmd.product_id),
        )
        async with self._session_factory() as session, session.begin():
            structlog.contextvars.bind_contextvars(step="load_product")
            product = await self._products.get(session, cmd.product_id)

            structlog.contextvars.bind_contextvars(step="reserve")
            product.reserve(cmd.quantity)
            await self._products.save(session, product)

            structlog.contextvars.bind_contextvars(step="create_order")
            order = Order.create(cmd.customer_id, product, cmd.quantity)
            await self._orders.save(session, order)

            structlog.contextvars.bind_contextvars(step="outbox")
            self._outbox.add(session, order.pull_events())

            return order.id

Все три репозитория получают одну и ту же session. Если self._orders.save упадёт с нарушением уникального ключа — product.reserve откатится вместе с ним. Нет состояния «остатки списаны, заказ не создан».

Передача session явным параметром в каждый метод репозитория (R-SQLA-REPO-3) — не архаика: это единственный способ гарантировать, что все операции в одной транзакции без глобального состояния.

expire_on_commit и маппинг до commit

R-SQLA-SESS-X2: SQLAlchemy по умолчанию инвалидирует атрибуты ORM-объекта после commit (expire_on_commit=True). В async-окружении обращение к инвалидированному атрибуту поднимает MissingGreenlet — нет текущего цикла событий для выполнения ленивой загрузки.

# ПЛОХО — чтение атрибута ORM-модели после commit
async with session.begin():
    order_model = await session.get(OrderModel, order_id)
    order_model.status = "confirmed"

return order_model.id
# ХОРОШО — маппинг в доменный объект внутри transaction
async with self._session_factory() as session, session.begin():
    order_model = await session.get(OrderModel, order_id)
    order = mapper.to_domain(order_model)
    order.confirm()
    await session.merge(mapper.to_model(order))
    return order.id

Стандартное решение — работать с доменными объектами внутри транзакции, не с ORM-моделями. После commit в Handler возвращается доменный объект или его идентификатор — они не связаны с сессией.

Если нужно отключить expire: async_sessionmaker(expire_on_commit=False) — допустимо, но тогда нужно самостоятельно следить за тем, что данные после commit могут быть устаревшими.

Что запрещено

АнтипаттернПравилоЧто взамен
commit() / rollback() внутри репозиторияR-SQLA-SESS-X1Граница TX на Handler через session.begin()
expire_on_commit=True + обращение к ORM-объекту после commit в asyncR-SQLA-SESS-X2Маппинг в доменный объект до commit; или expire_on_commit=False
try: ... except: pass / log без re-raise внутри транзакцииR-SQLA-SESS-X3Не глотать: исключение откатывает TX автоматически, edge-handler логирует с contextvars
Ручной commit между шагами Handler'а (partial commit)R-SQLA-SESS-X3Один session.begin() на Handler = одна атомарная транзакция
Глобальная / модульная AsyncSessionR-SQLA-SESS-2async_sessionmaker per-request через Depends
select(...) / session.execute(...) напрямую в core/ (домен/Handler)R-SQLA-REPO-X3Только через порт-интерфейс репозитория

Куда дальше

  • Repository pattern в SQLAlchemy — доменный порт (Protocol), SqlAlchemy<X>Repository, передача session в методы; почему репозиторий не вызывает commit.
  • Маппинг ORM ↔ domain в SQLAlchemy — явный маппер to_domain / to_model, сборка агрегата в маппере, почему не __dict__ / vars().
  • PostgreSQL: ACID и уровни изоляции — когда поднимать isolation level выше READ COMMITTED, retry на SQLSTATE 40001, write skew.