Опирается на правила:
R-SQLA-SESS-1…R-SQLA-SESS-4иR-SQLA-SESS-X1…R-SQLA-SESS-X3из SQLAlchemy Style Guide → раздел 4. Сессия и транзакции.
Важно знать
- Граница транзакции — на Handler через Unit of Work (
async with uow: ... await uow.commit()), не в репозитории и не в функции запроса.AsyncSessionper-request: создаётся черезDepends/контейнер, не хранится как атрибут модуля или глобальная переменная.expire_on_commit=True(дефолт SQLAlchemy) + обращение к ORM-объекту послеcommitв async =MissingGreenlet. Маппи в доменный объект доcommit.structlog.contextvars.bind_contextvars(step=..., use_case=..., aggregate_id=...)вызывается до шагов — исключение из UoW залогирует edge-handler с полным контекстом.- Исключение не глотать: транзакция откатывается автоматически через
session.begin(), исключение всплывает на edge-handler для маппинга вproblem+json.- Read-методы работают без
commit; запись — только через UoW-команду.- Несколько репозиториев внутри одного Handler'а получают одну сессию — все операции в одной транзакции.
Транзакция — это граница атомарного куска работы с базой. В UCP бизнес-операция = use case (ConfirmOrder, ProcessPayment, RegisterCustomer). Use case реализуется одним Handler'ом. Поэтому транзакция = Handler. Это снимает вопрос «где граница» и даёт единое правило: если нужна транзакция — она открывается в Handler через Unit of Work.
В Python нет аннотаций вроде @Transactional — транзакция открывается явно через session.begin() или через UoW-контекстный менеджер. Это делает границу видимой: читая Handler, сразу понятно, где начинается и заканчивается атомарный кусок работы.
Unit of Work на Handler'е
R-SQLA-SESS-1: граница транзакции — на Handler. UoW оборачивает сессию и предоставляет commit(); Handler не вызывает session.commit() напрямую.
# core/order/handler/confirm_order_handler.py
class ConfirmOrderHandler:
def __init__(
self,
session_factory: AsyncSessionFactory,
orders: OrderRepository,
outbox: OutboxRepository,
) -> None:
self._session_factory = session_factory
self._orders = orders
self._outbox = outbox
async def handle(self, cmd: ConfirmOrder) -> OrderId:
structlog.contextvars.bind_contextvars(
use_case="ConfirmOrder",
aggregate_id=str(cmd.order_id),
)
async with self._session_factory() as session, session.begin():
structlog.contextvars.bind_contextvars(step="load")
order = await self._orders.get(session, cmd.order_id)
structlog.contextvars.bind_contextvars(step="confirm")
order.confirm()
structlog.contextvars.bind_contextvars(step="save")
await self._orders.save(session, order)
structlog.contextvars.bind_contextvars(step="outbox")
self._outbox.add(session, order.pull_events())
return order.id
async with session.begin() — контекстный менеджер, который открывает транзакцию и при выходе без исключения делает commit, при исключении — rollback. session_factory — это async_sessionmaker, проброшенный через DI.
Что это даёт:
- Чёткая граница. Весь Handler — одна транзакция.
Orderиoutbox-запись либо сохраняются вместе, либо не сохраняются вовсе. - Нет частичных эффектов. Если на шаге
outboxупало исключение —order.saveоткатится автоматически. Нет состояния «заказ подтверждён, событие не записано». - Контекст в логах.
bind_contextvarsперед шагом означает, что исключение, всплывающее изsession.begin(), несёт информацию о том, на каком шаге произошёл сбой.
AsyncSession per-request
R-SQLA-SESS-2: сессия создаётся per-request. Не глобально, не на уровне модуля.
# adapters/in/api/deps.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
AsyncSessionFactory = async_sessionmaker[AsyncSession]
async def get_session(
session_factory: Annotated[AsyncSessionFactory, Depends(get_session_factory)],
) -> AsyncGenerator[AsyncSession, None]:
async with session_factory() as session:
yield session
# adapters/in/api/order_router.py
@router.post("/orders/{order_id}/confirm", status_code=200)
async def confirm_order(
order_id: UUID,
session: Annotated[AsyncSession, Depends(get_session)],
handler: Annotated[ConfirmOrderHandler, Depends(get_confirm_order_handler)],
) -> OrderConfirmedResponse:
result = await handler.handle(ConfirmOrder(order_id=order_id))
return OrderConfirmedResponse(order_id=result)
async_sessionmaker настраивается один раз при старте приложения и хранится как синглтон. Каждый запрос получает свою сессию — изолированную, с собственным connection из пула. Когда запрос завершён (нормально или с исключением), async with session_factory() закрывает сессию и возвращает connection в пул.
Глобальная сессия или сессия на уровне модуля — типичная ошибка при переносе кода из синхронного SQLAlchemy. В async-окружении это приведёт к гонке состояний: два запроса будут делить одну сессию, транзакции перепутаются.
Read-only запросы — без commit
R-SQLA-SESS-3: read-методы работают без commit. Репозиторий для чтения не входит в UoW-команду.
# core/order/handler/get_order_handler.py
class GetOrderHandler:
def __init__(
self,
session_factory: AsyncSessionFactory,
orders: OrderViewRepository,
) -> None:
self._session_factory = session_factory
self._orders = orders
async def handle(self, query: GetOrder) -> OrderView:
async with self._session_factory() as session:
return await self._orders.find_by_id(session, query.order_id)
Read-handler не открывает явную транзакцию: один запрос не требует изоляции от других читателей и не должен держать соединение под транзакцией дольше нужного. Если нужен консистентный снимок (читаем несколько таблиц, важна единая точка видимости данных), открываем session.begin() явно:
async with self._session_factory() as session:
async with session.begin():
order = await self._orders.find_by_id(session, query.order_id)
customer = await self._customers.find_by_id(session, order.customer_id)
Пример: страница заказа в Сбере — агрегат Order + профиль Customer для отображения. Без транзакции два запроса могут увидеть разное состояние Customer, если между ними прошло обновление.
Контекст шага в транзакции
R-SQLA-SESS-4: перед каждым шагом внутри session.begin() привязываем шаг к structlog.contextvars.
async def handle(self, cmd: ProcessPayment) -> PaymentId:
structlog.contextvars.bind_contextvars(
use_case="ProcessPayment",
aggregate_id=str(cmd.payment_id),
)
async with self._session_factory() as session, session.begin():
structlog.contextvars.bind_contextvars(step="load_order")
order = await self._orders.get(session, cmd.order_id)
structlog.contextvars.bind_contextvars(step="load_product")
product = await self._products.get(session, order.product_id)
structlog.contextvars.bind_contextvars(step="apply_payment")
order.apply_payment(cmd.amount, product)
structlog.contextvars.bind_contextvars(step="save_order")
await self._orders.save(session, order)
structlog.contextvars.bind_contextvars(step="write_outbox")
self._outbox.add(session, order.pull_events())
return PaymentId(cmd.payment_id)
Почему контекст привязывается до шага, а не в except:
- Исключение из
session.begin()автоматически откатывает транзакцию и всплывает наверх. bind_contextvars— это thread-local / contextvars запись: она живёт весь request-цикл.- Централизованный edge-handler (FastAPI exception handler) логирует исключение с полным контекстом:
step,use_case,aggregate_id,exc_info. - Не нужен
try/exceptв Handler ради лога — это как разR-SQLA-SESS-X3.
Для многошаговых саг с компенсацией допустим явный log.exception("step_failed", step=...) перед шагом компенсации — но только с re-raise или переводом в терминальное состояние агрегата.
Несколько репозиториев — одна сессия
Типовой сценарий: оформление заказа резервирует остатки Product и создаёт Order атомарно.
# core/order/handler/place_order_handler.py
class PlaceOrderHandler:
def __init__(
self,
session_factory: AsyncSessionFactory,
orders: OrderRepository,
products: ProductRepository,
outbox: OutboxRepository,
) -> None:
self._session_factory = session_factory
self._orders = orders
self._products = products
self._outbox = outbox
async def handle(self, cmd: PlaceOrder) -> OrderId:
structlog.contextvars.bind_contextvars(
use_case="PlaceOrder",
aggregate_id=str(cmd.product_id),
)
async with self._session_factory() as session, session.begin():
structlog.contextvars.bind_contextvars(step="load_product")
product = await self._products.get(session, cmd.product_id)
structlog.contextvars.bind_contextvars(step="reserve")
product.reserve(cmd.quantity)
await self._products.save(session, product)
structlog.contextvars.bind_contextvars(step="create_order")
order = Order.create(cmd.customer_id, product, cmd.quantity)
await self._orders.save(session, order)
structlog.contextvars.bind_contextvars(step="outbox")
self._outbox.add(session, order.pull_events())
return order.id
Все три репозитория получают одну и ту же session. Если self._orders.save упадёт с нарушением уникального ключа — product.reserve откатится вместе с ним. Нет состояния «остатки списаны, заказ не создан».
Передача session явным параметром в каждый метод репозитория (R-SQLA-REPO-3) — не архаика: это единственный способ гарантировать, что все операции в одной транзакции без глобального состояния.
expire_on_commit и маппинг до commit
R-SQLA-SESS-X2: SQLAlchemy по умолчанию инвалидирует атрибуты ORM-объекта после commit (expire_on_commit=True). В async-окружении обращение к инвалидированному атрибуту поднимает MissingGreenlet — нет текущего цикла событий для выполнения ленивой загрузки.
# ПЛОХО — чтение атрибута ORM-модели после commit
async with session.begin():
order_model = await session.get(OrderModel, order_id)
order_model.status = "confirmed"
return order_model.id
# ХОРОШО — маппинг в доменный объект внутри transaction
async with self._session_factory() as session, session.begin():
order_model = await session.get(OrderModel, order_id)
order = mapper.to_domain(order_model)
order.confirm()
await session.merge(mapper.to_model(order))
return order.id
Стандартное решение — работать с доменными объектами внутри транзакции, не с ORM-моделями. После commit в Handler возвращается доменный объект или его идентификатор — они не связаны с сессией.
Если нужно отключить expire: async_sessionmaker(expire_on_commit=False) — допустимо, но тогда нужно самостоятельно следить за тем, что данные после commit могут быть устаревшими.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
commit() / rollback() внутри репозитория | R-SQLA-SESS-X1 | Граница TX на Handler через session.begin() |
expire_on_commit=True + обращение к ORM-объекту после commit в async | R-SQLA-SESS-X2 | Маппинг в доменный объект до commit; или expire_on_commit=False |
try: ... except: pass / log без re-raise внутри транзакции | R-SQLA-SESS-X3 | Не глотать: исключение откатывает TX автоматически, edge-handler логирует с contextvars |
Ручной commit между шагами Handler'а (partial commit) | R-SQLA-SESS-X3 | Один session.begin() на Handler = одна атомарная транзакция |
Глобальная / модульная AsyncSession | R-SQLA-SESS-2 | async_sessionmaker per-request через Depends |
select(...) / session.execute(...) напрямую в core/ (домен/Handler) | R-SQLA-REPO-X3 | Только через порт-интерфейс репозитория |
Куда дальше
- Repository pattern в SQLAlchemy — доменный порт (
Protocol),SqlAlchemy<X>Repository, передачаsessionв методы; почему репозиторий не вызываетcommit. - Маппинг ORM ↔ domain в SQLAlchemy — явный маппер
to_domain/to_model, сборка агрегата в маппере, почему не__dict__/vars(). - PostgreSQL: ACID и уровни изоляции — когда поднимать isolation level выше
READ COMMITTED, retry наSQLSTATE 40001, write skew.