Опирается на правила:
R-REP-1…R-REP-5иR-REP-X1…R-REP-X3из DDD Tactical Style Guide → раздел 5. Repository.
Важно знать
- Порт репозитория —
Protocolвcore/<bc>/port/. Домен знает только об интерфейсе, не об SQLAlchemy или PostgreSQL.- Один репозиторий = один корень агрегата (
R-REP-3).OrderRepositoryработает сOrder,CustomerRepository— сCustomer.saveсохраняет агрегат целиком и атомарно записывает события в outbox (в той же транзакции) (R-REP-4).- Методы называются в терминах домена:
by_id,active_by_customer,pending_since(R-REP-5). Неfind_by_id_and_status_not_in_and_created_at_after.- Реализация — в
adapters/out/persistence/. Домен не знает про SQLAlchemy-модели,Row,Session.- Репозиторий возвращает доменный объект (
Order,Customer), не ORM-модель (R-REP-X1). Маппинг в адаптере.Specification, генерирующая SQL, в репозитории — антипаттерн (R-REP-X3). Для read-сценариев — отдельныйViewRepository/ query-метод.- Python-идиома для порта:
typing.Protocolсо структурной типизацией — без наследования от абстрактного класса.
Repository — это доменный объект, скрывающий детали хранения данных. Агрегат не знает, где он живёт: в PostgreSQL, в памяти для тестов или в Redis. Знает только репозиторий. Именно это позволяет тестировать доменную логику без поднятия базы данных. Раскрытие раздела 5 гайда.
Protocol-порт в домене
R-REP-1: порт репозитория объявлен в core/<bc>/port/ как Protocol. Домен зависит от абстракции, не от реализации.
# core/order/port/order_repository.py
from typing import Protocol
from core.order.aggregate.order import Order
from core.order.value_object.order_id import OrderId
from core.order.value_object.customer_id import CustomerId
class OrderRepository(Protocol):
async def by_id(self, order_id: OrderId) -> Order | None: ...
async def save(self, order: Order) -> None: ...
async def active_by_customer(self, customer_id: CustomerId) -> list[Order]: ...
Protocol в Python — структурная типизация. Реализующий класс не обязан явно наследоваться — достаточно иметь совместимые методы. Это позволяет легко подменять реализацию для тестов:
# tests/fakes/fake_order_repository.py
class FakeOrderRepository:
def __init__(self) -> None:
self._store: dict[OrderId, Order] = {}
async def by_id(self, order_id: OrderId) -> Order | None:
return self._store.get(order_id)
async def save(self, order: Order) -> None:
self._store[order.id] = order
async def active_by_customer(self, customer_id: CustomerId) -> list[Order]:
return [
o for o in self._store.values()
if o._customer_id == customer_id and o.status == OrderStatus.NEW
]
FakeOrderRepository удовлетворяет Protocol без наследования — mypy/pyright это проверят.
Реализация в адаптере
R-REP-2: реализация в adapters/out/persistence/. Домен (core/) не импортирует SQLAlchemy.
# adapters/out/persistence/sqlalchemy_order_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from core.order.aggregate.order import Order
from core.order.value_object.order_id import OrderId
from core.order.value_object.customer_id import CustomerId
from adapters.out.persistence.mapper.order_mapper import OrderMapper
from adapters.out.persistence.outbox.outbox_writer import OutboxWriter
from adapters.out.persistence.orm.order_orm import OrderOrm
class SqlAlchemyOrderRepository:
def __init__(
self,
session: AsyncSession,
mapper: OrderMapper,
outbox: OutboxWriter,
) -> None:
self._session = session
self._mapper = mapper
self._outbox = outbox
async def by_id(self, order_id: OrderId) -> Order | None:
stmt = select(OrderOrm).where(OrderOrm.id == order_id.value)
result = await self._session.execute(stmt)
orm_obj = result.scalar_one_or_none()
if orm_obj is None:
return None
return self._mapper.to_domain(orm_obj)
async def save(self, order: Order) -> None:
orm_obj = self._mapper.to_orm(order)
await self._session.merge(orm_obj)
events = order.pull_events()
await self._outbox.write_all(events)
async def active_by_customer(self, customer_id: CustomerId) -> list[Order]:
stmt = (
select(OrderOrm)
.where(
OrderOrm.customer_id == customer_id.value,
OrderOrm.status == "NEW",
)
)
result = await self._session.execute(stmt)
return [self._mapper.to_domain(row) for row in result.scalars()]
OrderMapper — отдельный класс, который переводит Order ↔ OrderOrm. Репозиторий не делает маппинг сам.
save — атомарно агрегат + события в outbox
R-REP-4: save сохраняет агрегат и пишет события в outbox в одной транзакции. Атомарность гарантирует, что не будет «агрегат сохранён, события потеряны» и наоборот.
# adapters/out/persistence/sqlalchemy_order_repository.py
async def save(self, order: Order) -> None:
# session.begin() управляется снаружи (Unit of Work / зависимость FastAPI)
orm_obj = self._mapper.to_orm(order)
await self._session.merge(orm_obj) # upsert агрегата
events = order.pull_events()
await self._outbox.write_all(events) # события в той же транзакции
# коммит — на уровне UoW, не здесь
pull_events() возвращает список и очищает внутреннее состояние агрегата. Если транзакция откатится, следующий вызов save (после retry Handler-а) снова вернёт те же события — они остались в агрегате, потому что pull_events был вызван до коммита, а save в целом не завершился.
Методы в терминах домена
R-REP-5: имена методов — это доменные операции, не SQL-запросы.
# ПЛОХО — имена в терминах SQL/таблицы
async def find_by_status_and_created_after(self, status: str, date: datetime): ...
async def update_status_in_db(self, order_id: UUID, status: str): ...
async def get_all(self): ...
# ХОРОШО — имена в терминах домена
async def by_id(self, order_id: OrderId) -> Order | None: ...
async def active_by_customer(self, customer_id: CustomerId) -> list[Order]: ...
async def pending_since(self, since: datetime) -> list[Order]: ...
pending_since — бизнес-понятие «заказы, ожидающие обработки дольше X». Реализация внутри может быть любой — WHERE status = 'NEW' AND created_at < :since.
ORM-модель не выходит за пределы адаптера
R-REP-X1: репозиторий возвращает доменный Order, не OrderOrm. Handler и агрегат не должны знать про SQLAlchemy.
# ПЛОХО — ORM-объект наружу
async def by_id(self, order_id: OrderId) -> OrderOrm | None: # ORM-тип в сигнатуре
...
# ХОРОШО — доменный объект через маппер
async def by_id(self, order_id: OrderId) -> Order | None:
orm_obj = await self._find_orm(order_id)
return self._mapper.to_domain(orm_obj) if orm_obj else None
Specification как SQL-builder — антипаттерн
R-REP-X3: Specification, генерирующая SQL-условия, в репозитории смешивает domain-side и query-side.
# ПЛОХО — Specification строит SQL
class ActiveOrderSpec:
def to_where_clause(self) -> str:
return "status = 'NEW'" # SQL в домене — нарушение R-MOD-2
# Репозиторий принимает Specification для построения запроса
async def find(self, spec: ActiveOrderSpec) -> list[Order]: ...
Для read-сценариев с фильтрацией — отдельный ViewRepository или query-метод. Доменная Specification работает в памяти (is_satisfied_by), не генерирует SQL.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Возвращать ORM-модель/Row из репозитория | R-REP-X1 | Маппер в адаптере, домен получает Order |
Методы под одну таблицу (update_status_in_db) | R-REP-X2 | Методы в терминах домена: confirm, cancel |
Specification, строящая SQL, в репозитории | R-REP-X3 | ViewRepository / отдельный query-метод для read |
Реализация репозитория в core/ | R-REP-2 | Только в adapters/out/persistence/ |
| Несколько агрегатов в одном репозитории | R-REP-3 | Один репозиторий = один корень |
Куда дальше
- DDD Tactical → раздел 5. Repository — нормативные формулировки
R-REP-*. - python/aggregate-root.md —
pull_events()в корне агрегата. - python/domain-event.md — как события публикуются через Outbox.
- python/module-structure.md — где живут порты и реализации в структуре проекта.
- python/specification.md — доменная Specification vs. SQL-фильтрация.