Опирается на правила: R-SQLA-REPO-1R-SQLA-REPO-4, R-SQLA-REPO-X1R-SQLA-REPO-X3, R-SQLA-MAP-1R-SQLA-MAP-2, R-SQLA-MAP-X1, R-SQLA-MODEL-1R-SQLA-MODEL-3, R-SQLA-MODEL-X1 из SQLAlchemy Style Guide → раздел 1. Repository-pattern.

Важно знать

  • Repository — это две сущности: Protocol в core/<bc>/port/ и SqlAlchemy<X>Repository в adapters/out/persistence/. Домен зависит только от протокола.
  • На вход и выход публичных методов — доменные объекты (агрегат, Value Object, read-DTO). ORM-модели (DeclarativeBase-наследники) остаются внутри адаптера.
  • AsyncSession инжектируется извне — через DI-контейнер или Unit of Work; не создаётся внутри репозитория.
  • ORM-модель, доменный агрегат и Pydantic-DTO — три разных типа. Не смешивать.
  • В репозитории нет бизнес-логики: никаких if order.status == ..., никаких событий, никаких HTTP-вызовов.
  • Граница транзакции — на Handler через Unit of Work, не в репозитории. commit()/rollback() внутри репозитория запрещены.
  • Каждый репозиторий покрыт интеграционным тестом против Testcontainers Postgres — без мок-сессий.

Repository в этом подходе — граница между domain-слоем и persistence-слоем. Domain знает «как устроен Order», persistence знает «как достать Order из PostgreSQL». Всё, что выходит из adapters/out/persistence/, уже превращено в доменный объект; ORM-типы внутрь домена не проникают, бизнес-логика в persistence не проникает.

Это не зеркало jOOQ-статьи: SQLAlchemy предлагает другие идиомы — DeclarativeBase + Mapped/mapped_column вместо generated Records, select(...) + await session.execute(...) вместо DSLContext, Protocol вместо Java-interface, явные функции-маппера вместо RecordMapper-бинов.

Доменный порт отдельно от реализации

R-SQLA-REPO-1: Protocol в core/<bc>/port/, реализация в adapters/out/persistence/.

# core/order/port/order_repository.py
from typing import Protocol
from uuid import UUID
from core.order.domain import Order

class OrderRepository(Protocol):
    async def find_by_id(self, session: "AsyncSession", order_id: UUID) -> Order | None: ...
    async def save(self, session: "AsyncSession", order: Order) -> None: ...
    async def list_by_customer(
        self, session: "AsyncSession", customer_id: UUID, limit: int, offset: int
    ) -> list[Order]: ...
# adapters/out/persistence/order_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from adapters.out.persistence.models import OrderModel
from adapters.out.persistence.order_mapper import to_domain, to_model
from core.order.domain import Order
from uuid import UUID

class SqlAlchemyOrderRepository:
    async def find_by_id(self, session: AsyncSession, order_id: UUID) -> Order | None:
        result = await session.execute(
            select(OrderModel).where(OrderModel.id == order_id)
        )
        model = result.scalar_one_or_none()
        return to_domain(model) if model else None

    async def save(self, session: AsyncSession, order: Order) -> None:
        model = to_model(order)
        await session.merge(model)

Handler инжектирует OrderRepository (Protocol), не SqlAlchemyOrderRepository напрямую — это и есть смысл разделения. Если появится RedisOrderRepository (cache-aside), handler не меняется.

AsyncSession передаётся параметром в каждый метод: Handler открывает сессию через UoW, передаёт в репозиторий — и тем самым явно контролирует, что несколько методов работают в одной транзакции.

ORM-модель — только в adapters/

R-SQLA-MODEL-1: DeclarativeBase-наследник живёт в adapters/out/persistence/, не в core/.

# adapters/out/persistence/models.py
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy import Numeric, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
import uuid
from datetime import datetime
from decimal import Decimal

class Base(DeclarativeBase):
    pass

class OrderModel(Base):
    __tablename__ = "orders"

    id: Mapped[uuid.UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    customer_id: Mapped[uuid.UUID] = mapped_column(PG_UUID(as_uuid=True), nullable=False)
    total: Mapped[Decimal] = mapped_column(Numeric(19, 4), nullable=False)
    status: Mapped[str] = mapped_column(nullable=False)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)

    items: Mapped[list["OrderItemModel"]] = relationship(
        "OrderItemModel", lazy="raise", back_populates="order"
    )

class OrderItemModel(Base):
    __tablename__ = "order_items"

    id: Mapped[uuid.UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    order_id: Mapped[uuid.UUID] = mapped_column(PG_UUID(as_uuid=True), ForeignKey("orders.id"), nullable=False)
    product_id: Mapped[uuid.UUID] = mapped_column(PG_UUID(as_uuid=True), nullable=False)
    quantity: Mapped[int] = mapped_column(nullable=False)
    price: Mapped[Decimal] = mapped_column(Numeric(19, 4), nullable=False)

    order: Mapped["OrderModel"] = relationship("OrderModel", back_populates="items")

Типы по R-SQLA-MODEL-2: деньги — Numeric(p, s)Decimal (не float); время — DateTime(timezone=True) → aware datetime; UUID — UUID(as_uuid=True).

lazy="raise" на relationship — обязательно в async-контексте. SQLAlchemy async не поддерживает ленивую загрузку: попытка обратиться к order.items без предварительного selectinload вызывает MissingGreenlet. lazy="raise" делает такую ошибку явной и немедленной, а не случайным детектом в runtime.

ORM-модель — анемичная persistence-структура. Никаких методов model.confirm(), никаких инвариантов — R-SQLA-MODEL-X1. Доменная логика живёт в агрегате Order.

Маппер рядом с репозиторием

R-SQLA-MAP-1: явные функции to_domain(model) -> Aggregate и to_model(aggregate) -> Model в отдельном файле рядом с репозиторием.

# adapters/out/persistence/order_mapper.py
from adapters.out.persistence.models import OrderModel, OrderItemModel
from core.order.domain import Order, OrderItem
from uuid import UUID

def to_domain(model: OrderModel) -> Order:
    return Order(
        id=model.id,
        customer_id=model.customer_id,
        total=model.total,
        status=model.status,
        created_at=model.created_at,
        items=[_item_to_domain(i) for i in model.items],
    )

def _item_to_domain(item: OrderItemModel) -> OrderItem:
    return OrderItem(
        id=item.id,
        product_id=item.product_id,
        quantity=item.quantity,
        price=item.price,
    )

def to_model(order: Order) -> OrderModel:
    model = OrderModel(
        id=order.id,
        customer_id=order.customer_id,
        total=order.total,
        status=order.status,
        created_at=order.created_at,
    )
    model.items = [_item_to_model(i) for i in order.items]
    return model

def _item_to_model(item: OrderItem) -> OrderItemModel:
    return OrderItemModel(
        id=item.id,
        product_id=item.product_id,
        quantity=item.quantity,
        price=item.price,
    )

R-SQLA-MAP-2: сборка агрегата из строк — в маппере, не размазана по репозиторию. Репозиторий вызывает to_domain(model) и возвращает результат — не собирает Order кусками у себя.

R-SQLA-MAP-X1: никакого __dict__/vars() для маппинга, никакого «ORM-модель и есть доменный объект». Три типа — три роли, между ними только явные функции.

Eager-load связей

R-SQLA-QRY-2: в async lazy="raise" на всех relationship. Загрузку включаем явно там, где нужно.

from sqlalchemy.orm import selectinload

class SqlAlchemyOrderRepository:
    async def find_by_id(self, session: AsyncSession, order_id: UUID) -> Order | None:
        result = await session.execute(
            select(OrderModel)
            .options(selectinload(OrderModel.items))
            .where(OrderModel.id == order_id)
        )
        model = result.scalar_one_or_none()
        return to_domain(model) if model else None

    async def list_by_customer(
        self, session: AsyncSession, customer_id: UUID, limit: int, offset: int
    ) -> list[Order]:
        result = await session.execute(
            select(OrderModel)
            .options(selectinload(OrderModel.items))
            .where(OrderModel.customer_id == customer_id)
            .order_by(OrderModel.created_at.desc())
            .limit(limit)
            .offset(offset)
        )
        return [to_domain(m) for m in result.scalars().all()]

selectinload генерирует отдельный SELECT ... WHERE order_id IN (...) — один запрос на все items, не N запросов по одному. joinedload как альтернатива дублирует строки заказа в flat-результате и требует дедупликации; для коллекций selectinload чище.

Читаемость сохраняется: select(Model).options(...).where(...).order_by(...).limit(...).offset(...) — цепочка как SQL.

Сессия через Unit of Work

R-SQLA-REPO-3, R-SQLA-SESS-1: AsyncSession инжектируется снаружи, граница транзакции — на Handler.

# core/order/handler/confirm_order_handler.py
import structlog
import structlog.contextvars
from core.order.port import OrderRepository
from core.outbox.port import OutboxPort
from core.order.command import ConfirmOrder

class ConfirmOrderHandler:
    def __init__(self, orders: OrderRepository, outbox: OutboxPort, session_factory) -> None:
        self._orders = orders
        self._outbox = outbox
        self._session_factory = session_factory

    async def handle(self, cmd: ConfirmOrder) -> None:
        structlog.contextvars.bind_contextvars(use_case="ConfirmOrder", aggregate_id=str(cmd.order_id))
        async with self._session_factory() as session, session.begin():
            structlog.contextvars.bind_contextvars(step="load")
            order = await self._orders.find_by_id(session, cmd.order_id)
            if order is None:
                raise OrderNotFoundError(cmd.order_id)
            structlog.contextvars.bind_contextvars(step="confirm")
            order.confirm()
            structlog.contextvars.bind_contextvars(step="save")
            await self._orders.save(session, order)
            structlog.contextvars.bind_contextvars(step="outbox")
            self._outbox.add(session, order.pull_events())

session.begin() — контекст-менеджер, при выходе без исключения вызывает commit(), при исключении — rollback(). Репозиторий не знает ни о каком commit.

structlog.contextvars.bind_contextvars(step=...) перед каждым шагом — это R-SQLA-SESS-4. Если на шаге "save" упадёт, edge-handler залогирует исключение с контекстом {use_case, aggregate_id, step}. Не нужно ловить исключение внутри handler'а ради лога.

R-SQLA-SESS-X2: если session.expire_on_commit=True (дефолт SQLAlchemy), обращение к атрибутам ORM-модели после commit в async вызывает MissingGreenlet. Решение — маппить в доменный объект до commit, не держать ORM-модель живой после границы транзакции. Репозиторий уже возвращает доменный объект, поэтому проблема не возникает.

Бизнес-логики в репозитории нет

R-SQLA-REPO-X2: ни if order.status == ..., ни вызовов внешних сервисов, ни публикации событий.

# ПЛОХО
async def save(self, session: AsyncSession, order: Order) -> None:
    if order.status == "CANCELLED":
        await self._notification_client.send("order cancelled")
    if order.total <= 0:
        raise ValueError("total must be positive")
    model = to_model(order)
    await session.merge(model)

Notification-вызов внутри транзакции выполнится, даже если транзакция потом откатится — классическая дыра. Для этого существует Outbox-pattern на Handler'е.

Инвариант total > 0 должен проверяться в конструкторе/фабрике Order. Если Order дошёл до репозитория — он уже валиден.

Что разрешено в репозитории: select/insert/merge, вызов маппера, selectinload для eager-load. Всё.

Что запрещено

АнтипаттернПравилоЧто взамен
Возврат ORM-модели (OrderModel) из метода портаR-SQLA-REPO-X1Доменный тип Order через to_domain
Бизнес-логика внутри репозитория (if order.status == ...)R-SQLA-REPO-X2Логика в доменных методах агрегата
AsyncSession или select(...) прямо в core/R-SQLA-REPO-X3Только через port.OrderRepository (Protocol)
commit()/rollback() внутри репозиторияR-SQLA-SESS-X1Граница TX на Handler через UoW
ORM-модель как доменный агрегат (OrderModel в core/)R-SQLA-MODEL-1, R-SQLA-MAP-X1Три типа: ORM-модель, агрегат, Pydantic-DTO
__dict__ / vars() для маппингаR-SQLA-MAP-X1Явные функции to_domain / to_model
lazy="select" на relationship в asyncR-SQLA-QRY-X2lazy="raise" + явный selectinload там, где нужно
session.query(...) (SQLAlchemy 1.x стиль)R-SQLA-QRY-X1select(...) + await session.execute(...)
Доменная логика на ORM-моделиR-SQLA-MODEL-X1Инварианты в доменном агрегате

Куда дальше

  • Транзакции в SQLAlchemy — Unit of Work на handler'е — как Handler открывает session.begin(), почему commit не в репозитории, обработка ошибок TX.
  • ORM-модели и маппинг — DeclarativeBase, Mapped/mapped_column, типы PostgreSQL через SQLAlchemy, R-SQLA-MODEL-* и R-SQLA-MAP-* подробно.
  • Запросы и пагинация — select(...) 2.0-style, selectinload vs joinedload, bulk-вставка, пагинация через limit/offset и keyset.
  • PostgreSQL: ACID и уровни изоляции — теория за session.begin() и когда менять уровень изоляции.