Опирается на правила:
R-SQLA-REPO-1…R-SQLA-REPO-4,R-SQLA-REPO-X1…R-SQLA-REPO-X3,R-SQLA-MAP-1…R-SQLA-MAP-2,R-SQLA-MAP-X1,R-SQLA-MODEL-1…R-SQLA-MODEL-3,R-SQLA-MODEL-X1из SQLAlchemy Style Guide → раздел 1. Repository-pattern.
Важно знать
- Repository — это две сущности:
Protocolвcore/<bc>/port/иSqlAlchemy<X>Repositoryвadapters/out/persistence/. Домен зависит только от протокола.- На вход и выход публичных методов — доменные объекты (агрегат, Value Object, read-DTO). ORM-модели (
DeclarativeBase-наследники) остаются внутри адаптера.AsyncSessionинжектируется извне — через DI-контейнер или Unit of Work; не создаётся внутри репозитория.- ORM-модель, доменный агрегат и Pydantic-DTO — три разных типа. Не смешивать.
- В репозитории нет бизнес-логики: никаких
if order.status == ..., никаких событий, никаких HTTP-вызовов.- Граница транзакции — на Handler через Unit of Work, не в репозитории.
commit()/rollback()внутри репозитория запрещены.- Каждый репозиторий покрыт интеграционным тестом против Testcontainers Postgres — без мок-сессий.
Repository в этом подходе — граница между domain-слоем и persistence-слоем. Domain знает «как устроен Order», persistence знает «как достать Order из PostgreSQL». Всё, что выходит из adapters/out/persistence/, уже превращено в доменный объект; ORM-типы внутрь домена не проникают, бизнес-логика в persistence не проникает.
Это не зеркало jOOQ-статьи: SQLAlchemy предлагает другие идиомы — DeclarativeBase + Mapped/mapped_column вместо generated Records, select(...) + await session.execute(...) вместо DSLContext, Protocol вместо Java-interface, явные функции-маппера вместо RecordMapper-бинов.
Доменный порт отдельно от реализации
R-SQLA-REPO-1: Protocol в core/<bc>/port/, реализация в adapters/out/persistence/.
# core/order/port/order_repository.py
from typing import Protocol
from uuid import UUID
from core.order.domain import Order
class OrderRepository(Protocol):
async def find_by_id(self, session: "AsyncSession", order_id: UUID) -> Order | None: ...
async def save(self, session: "AsyncSession", order: Order) -> None: ...
async def list_by_customer(
self, session: "AsyncSession", customer_id: UUID, limit: int, offset: int
) -> list[Order]: ...
# adapters/out/persistence/order_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from adapters.out.persistence.models import OrderModel
from adapters.out.persistence.order_mapper import to_domain, to_model
from core.order.domain import Order
from uuid import UUID
class SqlAlchemyOrderRepository:
async def find_by_id(self, session: AsyncSession, order_id: UUID) -> Order | None:
result = await session.execute(
select(OrderModel).where(OrderModel.id == order_id)
)
model = result.scalar_one_or_none()
return to_domain(model) if model else None
async def save(self, session: AsyncSession, order: Order) -> None:
model = to_model(order)
await session.merge(model)
Handler инжектирует OrderRepository (Protocol), не SqlAlchemyOrderRepository напрямую — это и есть смысл разделения. Если появится RedisOrderRepository (cache-aside), handler не меняется.
AsyncSession передаётся параметром в каждый метод: Handler открывает сессию через UoW, передаёт в репозиторий — и тем самым явно контролирует, что несколько методов работают в одной транзакции.
ORM-модель — только в adapters/
R-SQLA-MODEL-1: DeclarativeBase-наследник живёт в adapters/out/persistence/, не в core/.
# adapters/out/persistence/models.py
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy import Numeric, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
import uuid
from datetime import datetime
from decimal import Decimal
class Base(DeclarativeBase):
pass
class OrderModel(Base):
__tablename__ = "orders"
id: Mapped[uuid.UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
customer_id: Mapped[uuid.UUID] = mapped_column(PG_UUID(as_uuid=True), nullable=False)
total: Mapped[Decimal] = mapped_column(Numeric(19, 4), nullable=False)
status: Mapped[str] = mapped_column(nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
items: Mapped[list["OrderItemModel"]] = relationship(
"OrderItemModel", lazy="raise", back_populates="order"
)
class OrderItemModel(Base):
__tablename__ = "order_items"
id: Mapped[uuid.UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
order_id: Mapped[uuid.UUID] = mapped_column(PG_UUID(as_uuid=True), ForeignKey("orders.id"), nullable=False)
product_id: Mapped[uuid.UUID] = mapped_column(PG_UUID(as_uuid=True), nullable=False)
quantity: Mapped[int] = mapped_column(nullable=False)
price: Mapped[Decimal] = mapped_column(Numeric(19, 4), nullable=False)
order: Mapped["OrderModel"] = relationship("OrderModel", back_populates="items")
Типы по R-SQLA-MODEL-2: деньги — Numeric(p, s) → Decimal (не float); время — DateTime(timezone=True) → aware datetime; UUID — UUID(as_uuid=True).
lazy="raise" на relationship — обязательно в async-контексте. SQLAlchemy async не поддерживает ленивую загрузку: попытка обратиться к order.items без предварительного selectinload вызывает MissingGreenlet. lazy="raise" делает такую ошибку явной и немедленной, а не случайным детектом в runtime.
ORM-модель — анемичная persistence-структура. Никаких методов model.confirm(), никаких инвариантов — R-SQLA-MODEL-X1. Доменная логика живёт в агрегате Order.
Маппер рядом с репозиторием
R-SQLA-MAP-1: явные функции to_domain(model) -> Aggregate и to_model(aggregate) -> Model в отдельном файле рядом с репозиторием.
# adapters/out/persistence/order_mapper.py
from adapters.out.persistence.models import OrderModel, OrderItemModel
from core.order.domain import Order, OrderItem
from uuid import UUID
def to_domain(model: OrderModel) -> Order:
return Order(
id=model.id,
customer_id=model.customer_id,
total=model.total,
status=model.status,
created_at=model.created_at,
items=[_item_to_domain(i) for i in model.items],
)
def _item_to_domain(item: OrderItemModel) -> OrderItem:
return OrderItem(
id=item.id,
product_id=item.product_id,
quantity=item.quantity,
price=item.price,
)
def to_model(order: Order) -> OrderModel:
model = OrderModel(
id=order.id,
customer_id=order.customer_id,
total=order.total,
status=order.status,
created_at=order.created_at,
)
model.items = [_item_to_model(i) for i in order.items]
return model
def _item_to_model(item: OrderItem) -> OrderItemModel:
return OrderItemModel(
id=item.id,
product_id=item.product_id,
quantity=item.quantity,
price=item.price,
)
R-SQLA-MAP-2: сборка агрегата из строк — в маппере, не размазана по репозиторию. Репозиторий вызывает to_domain(model) и возвращает результат — не собирает Order кусками у себя.
R-SQLA-MAP-X1: никакого __dict__/vars() для маппинга, никакого «ORM-модель и есть доменный объект». Три типа — три роли, между ними только явные функции.
Eager-load связей
R-SQLA-QRY-2: в async lazy="raise" на всех relationship. Загрузку включаем явно там, где нужно.
from sqlalchemy.orm import selectinload
class SqlAlchemyOrderRepository:
async def find_by_id(self, session: AsyncSession, order_id: UUID) -> Order | None:
result = await session.execute(
select(OrderModel)
.options(selectinload(OrderModel.items))
.where(OrderModel.id == order_id)
)
model = result.scalar_one_or_none()
return to_domain(model) if model else None
async def list_by_customer(
self, session: AsyncSession, customer_id: UUID, limit: int, offset: int
) -> list[Order]:
result = await session.execute(
select(OrderModel)
.options(selectinload(OrderModel.items))
.where(OrderModel.customer_id == customer_id)
.order_by(OrderModel.created_at.desc())
.limit(limit)
.offset(offset)
)
return [to_domain(m) for m in result.scalars().all()]
selectinload генерирует отдельный SELECT ... WHERE order_id IN (...) — один запрос на все items, не N запросов по одному. joinedload как альтернатива дублирует строки заказа в flat-результате и требует дедупликации; для коллекций selectinload чище.
Читаемость сохраняется: select(Model).options(...).where(...).order_by(...).limit(...).offset(...) — цепочка как SQL.
Сессия через Unit of Work
R-SQLA-REPO-3, R-SQLA-SESS-1: AsyncSession инжектируется снаружи, граница транзакции — на Handler.
# core/order/handler/confirm_order_handler.py
import structlog
import structlog.contextvars
from core.order.port import OrderRepository
from core.outbox.port import OutboxPort
from core.order.command import ConfirmOrder
class ConfirmOrderHandler:
def __init__(self, orders: OrderRepository, outbox: OutboxPort, session_factory) -> None:
self._orders = orders
self._outbox = outbox
self._session_factory = session_factory
async def handle(self, cmd: ConfirmOrder) -> None:
structlog.contextvars.bind_contextvars(use_case="ConfirmOrder", aggregate_id=str(cmd.order_id))
async with self._session_factory() as session, session.begin():
structlog.contextvars.bind_contextvars(step="load")
order = await self._orders.find_by_id(session, cmd.order_id)
if order is None:
raise OrderNotFoundError(cmd.order_id)
structlog.contextvars.bind_contextvars(step="confirm")
order.confirm()
structlog.contextvars.bind_contextvars(step="save")
await self._orders.save(session, order)
structlog.contextvars.bind_contextvars(step="outbox")
self._outbox.add(session, order.pull_events())
session.begin() — контекст-менеджер, при выходе без исключения вызывает commit(), при исключении — rollback(). Репозиторий не знает ни о каком commit.
structlog.contextvars.bind_contextvars(step=...) перед каждым шагом — это R-SQLA-SESS-4. Если на шаге "save" упадёт, edge-handler залогирует исключение с контекстом {use_case, aggregate_id, step}. Не нужно ловить исключение внутри handler'а ради лога.
R-SQLA-SESS-X2: если session.expire_on_commit=True (дефолт SQLAlchemy), обращение к атрибутам ORM-модели после commit в async вызывает MissingGreenlet. Решение — маппить в доменный объект до commit, не держать ORM-модель живой после границы транзакции. Репозиторий уже возвращает доменный объект, поэтому проблема не возникает.
Бизнес-логики в репозитории нет
R-SQLA-REPO-X2: ни if order.status == ..., ни вызовов внешних сервисов, ни публикации событий.
# ПЛОХО
async def save(self, session: AsyncSession, order: Order) -> None:
if order.status == "CANCELLED":
await self._notification_client.send("order cancelled")
if order.total <= 0:
raise ValueError("total must be positive")
model = to_model(order)
await session.merge(model)
Notification-вызов внутри транзакции выполнится, даже если транзакция потом откатится — классическая дыра. Для этого существует Outbox-pattern на Handler'е.
Инвариант total > 0 должен проверяться в конструкторе/фабрике Order. Если Order дошёл до репозитория — он уже валиден.
Что разрешено в репозитории: select/insert/merge, вызов маппера, selectinload для eager-load. Всё.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Возврат ORM-модели (OrderModel) из метода порта | R-SQLA-REPO-X1 | Доменный тип Order через to_domain |
Бизнес-логика внутри репозитория (if order.status == ...) | R-SQLA-REPO-X2 | Логика в доменных методах агрегата |
AsyncSession или select(...) прямо в core/ | R-SQLA-REPO-X3 | Только через port.OrderRepository (Protocol) |
commit()/rollback() внутри репозитория | R-SQLA-SESS-X1 | Граница TX на Handler через UoW |
ORM-модель как доменный агрегат (OrderModel в core/) | R-SQLA-MODEL-1, R-SQLA-MAP-X1 | Три типа: ORM-модель, агрегат, Pydantic-DTO |
__dict__ / vars() для маппинга | R-SQLA-MAP-X1 | Явные функции to_domain / to_model |
lazy="select" на relationship в async | R-SQLA-QRY-X2 | lazy="raise" + явный selectinload там, где нужно |
session.query(...) (SQLAlchemy 1.x стиль) | R-SQLA-QRY-X1 | select(...) + await session.execute(...) |
| Доменная логика на ORM-модели | R-SQLA-MODEL-X1 | Инварианты в доменном агрегате |
Куда дальше
- Транзакции в SQLAlchemy — Unit of Work на handler'е — как Handler открывает
session.begin(), почемуcommitне в репозитории, обработка ошибок TX. - ORM-модели и маппинг —
DeclarativeBase,Mapped/mapped_column, типы PostgreSQL через SQLAlchemy,R-SQLA-MODEL-*иR-SQLA-MAP-*подробно. - Запросы и пагинация —
select(...)2.0-style,selectinloadvsjoinedload, bulk-вставка, пагинация черезlimit/offsetи keyset. - PostgreSQL: ACID и уровни изоляции — теория за
session.begin()и когда менять уровень изоляции.