Опирается на правила: R-REP-1R-REP-5 и R-REP-X1R-REP-X3 из DDD Tactical Style Guide → раздел 5. Repository.

Важно знать

  • Порт репозитория — Protocol в core/<bc>/port/. Домен знает только об интерфейсе, не об SQLAlchemy или PostgreSQL.
  • Один репозиторий = один корень агрегата (R-REP-3). OrderRepository работает с Order, CustomerRepository — с Customer.
  • save сохраняет агрегат целиком и атомарно записывает события в outbox (в той же транзакции) (R-REP-4).
  • Методы называются в терминах домена: by_id, active_by_customer, pending_since (R-REP-5). Не find_by_id_and_status_not_in_and_created_at_after.
  • Реализация — в adapters/out/persistence/. Домен не знает про SQLAlchemy-модели, Row, Session.
  • Репозиторий возвращает доменный объект (Order, Customer), не ORM-модель (R-REP-X1). Маппинг в адаптере.
  • Specification, генерирующая SQL, в репозитории — антипаттерн (R-REP-X3). Для read-сценариев — отдельный ViewRepository / query-метод.
  • Python-идиома для порта: typing.Protocol со структурной типизацией — без наследования от абстрактного класса.

Repository — это доменный объект, скрывающий детали хранения данных. Агрегат не знает, где он живёт: в PostgreSQL, в памяти для тестов или в Redis. Знает только репозиторий. Именно это позволяет тестировать доменную логику без поднятия базы данных. Раскрытие раздела 5 гайда.

Protocol-порт в домене

R-REP-1: порт репозитория объявлен в core/<bc>/port/ как Protocol. Домен зависит от абстракции, не от реализации.

# core/order/port/order_repository.py
from typing import Protocol

from core.order.aggregate.order import Order
from core.order.value_object.order_id import OrderId
from core.order.value_object.customer_id import CustomerId


class OrderRepository(Protocol):
    async def by_id(self, order_id: OrderId) -> Order | None: ...
    async def save(self, order: Order) -> None: ...
    async def active_by_customer(self, customer_id: CustomerId) -> list[Order]: ...

Protocol в Python — структурная типизация. Реализующий класс не обязан явно наследоваться — достаточно иметь совместимые методы. Это позволяет легко подменять реализацию для тестов:

# tests/fakes/fake_order_repository.py
class FakeOrderRepository:
    def __init__(self) -> None:
        self._store: dict[OrderId, Order] = {}

    async def by_id(self, order_id: OrderId) -> Order | None:
        return self._store.get(order_id)

    async def save(self, order: Order) -> None:
        self._store[order.id] = order

    async def active_by_customer(self, customer_id: CustomerId) -> list[Order]:
        return [
            o for o in self._store.values()
            if o._customer_id == customer_id and o.status == OrderStatus.NEW
        ]

FakeOrderRepository удовлетворяет Protocol без наследования — mypy/pyright это проверят.

Реализация в адаптере

R-REP-2: реализация в adapters/out/persistence/. Домен (core/) не импортирует SQLAlchemy.

# adapters/out/persistence/sqlalchemy_order_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

from core.order.aggregate.order import Order
from core.order.value_object.order_id import OrderId
from core.order.value_object.customer_id import CustomerId
from adapters.out.persistence.mapper.order_mapper import OrderMapper
from adapters.out.persistence.outbox.outbox_writer import OutboxWriter
from adapters.out.persistence.orm.order_orm import OrderOrm


class SqlAlchemyOrderRepository:
    def __init__(
        self,
        session: AsyncSession,
        mapper: OrderMapper,
        outbox: OutboxWriter,
    ) -> None:
        self._session = session
        self._mapper = mapper
        self._outbox = outbox

    async def by_id(self, order_id: OrderId) -> Order | None:
        stmt = select(OrderOrm).where(OrderOrm.id == order_id.value)
        result = await self._session.execute(stmt)
        orm_obj = result.scalar_one_or_none()
        if orm_obj is None:
            return None
        return self._mapper.to_domain(orm_obj)

    async def save(self, order: Order) -> None:
        orm_obj = self._mapper.to_orm(order)
        await self._session.merge(orm_obj)
        events = order.pull_events()
        await self._outbox.write_all(events)

    async def active_by_customer(self, customer_id: CustomerId) -> list[Order]:
        stmt = (
            select(OrderOrm)
            .where(
                OrderOrm.customer_id == customer_id.value,
                OrderOrm.status == "NEW",
            )
        )
        result = await self._session.execute(stmt)
        return [self._mapper.to_domain(row) for row in result.scalars()]

OrderMapper — отдельный класс, который переводит Order ↔ OrderOrm. Репозиторий не делает маппинг сам.

save — атомарно агрегат + события в outbox

R-REP-4: save сохраняет агрегат и пишет события в outbox в одной транзакции. Атомарность гарантирует, что не будет «агрегат сохранён, события потеряны» и наоборот.

# adapters/out/persistence/sqlalchemy_order_repository.py
async def save(self, order: Order) -> None:
    # session.begin() управляется снаружи (Unit of Work / зависимость FastAPI)
    orm_obj = self._mapper.to_orm(order)
    await self._session.merge(orm_obj)      # upsert агрегата
    events = order.pull_events()
    await self._outbox.write_all(events)    # события в той же транзакции
    # коммит — на уровне UoW, не здесь

pull_events() возвращает список и очищает внутреннее состояние агрегата. Если транзакция откатится, следующий вызов save (после retry Handler-а) снова вернёт те же события — они остались в агрегате, потому что pull_events был вызван до коммита, а save в целом не завершился.

Методы в терминах домена

R-REP-5: имена методов — это доменные операции, не SQL-запросы.

# ПЛОХО — имена в терминах SQL/таблицы
async def find_by_status_and_created_after(self, status: str, date: datetime): ...
async def update_status_in_db(self, order_id: UUID, status: str): ...
async def get_all(self): ...

# ХОРОШО — имена в терминах домена
async def by_id(self, order_id: OrderId) -> Order | None: ...
async def active_by_customer(self, customer_id: CustomerId) -> list[Order]: ...
async def pending_since(self, since: datetime) -> list[Order]: ...

pending_since — бизнес-понятие «заказы, ожидающие обработки дольше X». Реализация внутри может быть любой — WHERE status = 'NEW' AND created_at < :since.

ORM-модель не выходит за пределы адаптера

R-REP-X1: репозиторий возвращает доменный Order, не OrderOrm. Handler и агрегат не должны знать про SQLAlchemy.

# ПЛОХО — ORM-объект наружу
async def by_id(self, order_id: OrderId) -> OrderOrm | None:  # ORM-тип в сигнатуре
    ...

# ХОРОШО — доменный объект через маппер
async def by_id(self, order_id: OrderId) -> Order | None:
    orm_obj = await self._find_orm(order_id)
    return self._mapper.to_domain(orm_obj) if orm_obj else None

Specification как SQL-builder — антипаттерн

R-REP-X3: Specification, генерирующая SQL-условия, в репозитории смешивает domain-side и query-side.

# ПЛОХО — Specification строит SQL
class ActiveOrderSpec:
    def to_where_clause(self) -> str:
        return "status = 'NEW'"   # SQL в домене — нарушение R-MOD-2

# Репозиторий принимает Specification для построения запроса
async def find(self, spec: ActiveOrderSpec) -> list[Order]: ...

Для read-сценариев с фильтрацией — отдельный ViewRepository или query-метод. Доменная Specification работает в памяти (is_satisfied_by), не генерирует SQL.

Что запрещено

АнтипаттернПравилоЧто взамен
Возвращать ORM-модель/Row из репозиторияR-REP-X1Маппер в адаптере, домен получает Order
Методы под одну таблицу (update_status_in_db)R-REP-X2Методы в терминах домена: confirm, cancel
Specification, строящая SQL, в репозиторииR-REP-X3ViewRepository / отдельный query-метод для read
Реализация репозитория в core/R-REP-2Только в adapters/out/persistence/
Несколько агрегатов в одном репозиторииR-REP-3Один репозиторий = один корень

Куда дальше

  • DDD Tactical → раздел 5. Repository — нормативные формулировки R-REP-*.
  • python/aggregate-root.md — pull_events() в корне агрегата.
  • python/domain-event.md — как события публикуются через Outbox.
  • python/module-structure.md — где живут порты и реализации в структуре проекта.
  • python/specification.md — доменная Specification vs. SQL-фильтрация.