Опирается на правила: PYTS-9PYTS-11 из Python Test Strategy → раздел 3. DatabasePreparer — fluent setup БД.

Важно знать

  • На каждый Bounded Context — свой <Domain>DatabasePreparer над AsyncSession или Core-соединением.
  • Три группы методов: clear_*() очищает таблицы, create_*(...) вставляет данные, prepare() запускает очередь.
  • Не пересоздаём схему между тестами — только DELETE, схема применяется Liquibase/Alembic при старте контейнера.
  • Порядок вызовов = порядок исполнения — методы добавляют в очередь, prepare() запускает всё разом.
  • FK ordering: clear_* от leaf к root, create_* от root к leaf.
  • Прямой SQLAlchemy Core или raw SQL, не ORM-модели домена — тест не зависит от продуктового слоя.
  • clear_all() вызывается в autouse-фикстуре — чистое состояние перед каждым тестом.

Setup БД в integration-тесте — обычно самая шумная часть: вокруг трёх строк бизнес-логики накапливаются десятки INSERT-ов. DatabasePreparer скрывает этот шум за fluent-интерфейсом и делает Arrange-фазу компактной.

Структура OrderDatabasePreparer

PYTS-9: один препарер на Bounded Context, принимает AsyncSession или AsyncConnection.

from collections.abc import Callable, Coroutine
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text


class OrderDatabasePreparer:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session
        self._queue: list[Callable[[], Coroutine]] = []

    def clear_orders(self) -> "OrderDatabasePreparer":
        async def _clear() -> None:
            await self._session.execute(text("DELETE FROM orders"))
        self._queue.append(_clear)
        return self

    def clear_order_items(self) -> "OrderDatabasePreparer":
        async def _clear() -> None:
            await self._session.execute(text("DELETE FROM order_items"))
        self._queue.append(_clear)
        return self

    def clear_outbox(self) -> "OrderDatabasePreparer":
        async def _clear() -> None:
            await self._session.execute(text("DELETE FROM outbox"))
        self._queue.append(_clear)
        return self

    def create_order(self, order: dict) -> "OrderDatabasePreparer":
        async def _insert() -> None:
            await self._session.execute(
                text(
                    "INSERT INTO orders(id, customer_id, status, created_at)"
                    " VALUES(:id, :customer_id, :status, :created_at)"
                ),
                order,
            )
        self._queue.append(_insert)
        return self

    def create_order_item(self, item: dict) -> "OrderDatabasePreparer":
        async def _insert() -> None:
            await self._session.execute(
                text(
                    "INSERT INTO order_items(id, order_id, product_id, amount)"
                    " VALUES(:id, :order_id, :product_id, :amount)"
                ),
                item,
            )
        self._queue.append(_insert)
        return self

    async def clear_all(self) -> None:
        self.clear_order_items().clear_orders().clear_outbox()
        await self.prepare()

    async def prepare(self) -> None:
        for step in self._queue:
            await step()
        self._queue.clear()
        await self._session.commit()

Что важно:

  • Fluent — все методы возвращают self, цепочка читается сверху вниз.
  • Lazy execution — методы добавляют корутины в _queue, не выполняют сразу. prepare() запускает всё.
  • text() — SQLAlchemy Core через параметризованный raw SQL. Не ORM-модели домена.
  • dict на входе — тестовые данные, созданные через ObjectGenerator (см. python/object-generator).

Три группы методов

PYTS-9: clear_* / create_* / prepare.

clear_*() — очистка

def clear_orders(self) -> "OrderDatabasePreparer":
    async def _clear() -> None:
        await self._session.execute(text("DELETE FROM orders"))
    self._queue.append(_clear)
    return self

Чистит конкретную таблицу. clear_all() — convenience-метод, вызывает все clear_* в правильном FK-порядке.

create_*(...) — вставка

def create_order(self, order: dict) -> "OrderDatabasePreparer":
    async def _insert() -> None:
        await self._session.execute(
            text(
                "INSERT INTO orders(id, customer_id, status, created_at)"
                " VALUES(:id, :customer_id, :status, :created_at)"
            ),
            order,
        )
    self._queue.append(_insert)
    return self

Принимает dict с полями строки. dict создаётся через OrderObjectGenerator (все значения с разумными дефолтами, в тесте переопределяем только важное для сценария).

prepare() — запуск

async def prepare(self) -> None:
    for step in self._queue:
        await step()
    self._queue.clear()
    await self._session.commit()

Выполняет очередь по порядку, коммитит сессию, очищает очередь.

Не пересоздаём схему

PYTS-10: только DELETE, не DROP TABLE / metadata.drop_all().

# ПЛОХО — медленно, маскирует миграционные баги
@pytest_asyncio.fixture(autouse=True)
async def reset_db(engine):
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

# ХОРОШО — миллисекунды
@pytest_asyncio.fixture(autouse=True)
async def clear_db(preparer: OrderDatabasePreparer):
    await preparer.clear_all()

Различия:

  • drop_all + create_all — несколько секунд на каждый тест; если миграция содержит баг в drop — тест это не найдёт.
  • DELETE — пустые таблицы за миллисекунды; схема применяется Liquibase/Alembic один раз при старте контейнера.

PYTS-X3: Base.metadata.create_all() / drop_all() между тестами запрещены.

FK ordering

PYTS-11: порядок методов = порядок исполнения.

Если в схеме order_items.order_id → orders.id:

# ПЛОХО — FK violation при DELETE
await preparer \
    .clear_orders() \
    .clear_order_items() \
    .prepare()

# ХОРОШО — сначала зависимые
await preparer \
    .clear_order_items() \
    .clear_orders() \
    .prepare()

При создании — наоборот:

# ПЛОХО — order_id не существует
await preparer \
    .create_order_item(item) \
    .create_order(order) \
    .prepare()

# ХОРОШО — сначала родитель
await preparer \
    .create_order(order) \
    .create_order_item(item) \
    .prepare()

Правило: clear от leaf к root, create от root к leaf.

Фикстура препарера и autouse-очистка

PYTS-4, PYTS-9: препарер создаётся как фикстура и разделяет AsyncSession с тестом.

# conftest.py

import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession

from tests.support.order_database_preparer import OrderDatabasePreparer


@pytest_asyncio.fixture
async def db_session(async_engine) -> AsyncSession:
    async with AsyncSession(async_engine) as session:
        yield session


@pytest_asyncio.fixture
async def order_preparer(db_session: AsyncSession) -> OrderDatabasePreparer:
    return OrderDatabasePreparer(db_session)


@pytest_asyncio.fixture(autouse=True)
async def clear_db(order_preparer: OrderDatabasePreparer) -> None:
    await order_preparer.clear_all()

autouse=True на clear_db означает, что каждый тест получает чистое состояние без явного вызова. В тестах — только дополнительный setup.

Полный пример: Arrange через препарер

import pytest
from httpx import AsyncClient

from tests.support.order_database_preparer import OrderDatabasePreparer
from tests.support.order_object_generator import OrderObjectGenerator
from tests.fixtures.auth import success_token


@pytest.mark.asyncio
async def test_confirm_order_when_draft_returns_200(
    client: AsyncClient,
    order_preparer: OrderDatabasePreparer,
) -> None:
    order_id = "11111111-1111-1111-1111-111111111111"

    order = OrderObjectGenerator().with_id(order_id).with_status("DRAFT").build()
    await order_preparer.create_order(order).prepare()

    response = await client.post(
        f"/v1/orders/{order_id}/confirm",
        headers=success_token(),
    )

    assert response.status_code == 200

    outbox_rows = await order_preparer.find_outbox_events("OrderConfirmed")
    assert len(outbox_rows) == 1
    assert outbox_rows[0]["aggregate_id"] == order_id

Три блока с пустой строкой между ними:

  • Arrange — подготовка данных через ObjectGenerator + prepare().
  • Act — один await client.post(...).
  • Assert — проверка response и побочных эффектов (Outbox, состояние БД).

Find*-методы — для Assert-фазы

Препарер предоставляет find_*-методы для проверки состояния БД без обращения к бизнес-слою.

async def find_order(self, order_id: str) -> dict | None:
    result = await self._session.execute(
        text("SELECT id, customer_id, status, created_at FROM orders WHERE id = :id"),
        {"id": order_id},
    )
    row = result.mappings().one_or_none()
    return dict(row) if row else None

async def find_outbox_events(self, event_type: str) -> list[dict]:
    result = await self._session.execute(
        text("SELECT event_type, payload FROM outbox WHERE event_type = :event_type"),
        {"event_type": event_type},
    )
    return [dict(row) for row in result.mappings()]

Использование в Assert-фазе:

row = await order_preparer.find_order(order_id)
assert row["status"] == "CONFIRMED"

events = await order_preparer.find_outbox_events("OrderConfirmed")
assert len(events) == 1
assert events[0]["payload"]["customerId"] == customer_id

find_* работают на той же AsyncSession — транзакция единая, нет рассинхронизации между тем, что записал тест, и тем, что читает Assert.

Пример с доменом Customer / Product

class CustomerOrderDatabasePreparer:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session
        self._queue: list[Callable[[], Coroutine]] = []

    def clear_products(self) -> "CustomerOrderDatabasePreparer":
        async def _clear() -> None:
            await self._session.execute(text("DELETE FROM products"))
        self._queue.append(_clear)
        return self

    def clear_customers(self) -> "CustomerOrderDatabasePreparer":
        async def _clear() -> None:
            await self._session.execute(text("DELETE FROM customers"))
        self._queue.append(_clear)
        return self

    def create_customer(self, customer: dict) -> "CustomerOrderDatabasePreparer":
        async def _insert() -> None:
            await self._session.execute(
                text(
                    "INSERT INTO customers(id, name, email)"
                    " VALUES(:id, :name, :email)"
                ),
                customer,
            )
        self._queue.append(_insert)
        return self

    def create_product(self, product: dict) -> "CustomerOrderDatabasePreparer":
        async def _insert() -> None:
            await self._session.execute(
                text(
                    "INSERT INTO products(id, customer_id, title, price)"
                    " VALUES(:id, :customer_id, :title, :price)"
                ),
                product,
            )
        self._queue.append(_insert)
        return self

    async def clear_all(self) -> None:
        self.clear_products().clear_customers()
        await self.prepare()

    async def prepare(self) -> None:
        for step in self._queue:
            await step()
        self._queue.clear()
        await self._session.commit()

Использование в тесте:

async def test_get_products_when_customer_exists_returns_200(
    client: AsyncClient,
    customer_preparer: CustomerOrderDatabasePreparer,
) -> None:
    customer = CustomerObjectGenerator().with_id("c-sber").build()
    product = ProductObjectGenerator().with_customer_id("c-sber").build()

    await customer_preparer \
        .create_customer(customer) \
        .create_product(product) \
        .prepare()

    response = await client.get("/v1/customers/c-sber/products")

    assert response.status_code == 200
    assert response.json()["items"][0]["id"] == product["id"]

Что запрещено

АнтипаттернПравилоЧто взамен
await session.execute(text("INSERT INTO ...")) прямо в тестеPYTS-9метод create_*() в препарере
MagicMock на репозиторий в integration-тестеPYTS-9реальная AsyncSession, реальный Postgres
Base.metadata.drop_all() / create_all() между тестамиPYTS-X3DELETE через clear_*; схема один раз при старте
@pytest.fixture(autouse=True) с TRUNCATE без FK-порядкаPYTS-11clear_all() в правильном FK-порядке
FK нарушен при create_*PYTS-11сначала родительская запись, затем дочерняя
Один DatabasePreparer для всего сервисаPYTS-9отдельный класс на каждый Bounded Context
ORM-модели домена в методах препарераPYTS-9text() с dict; тест не зависит от доменного слоя
prepare() не вызван — очередь не выполненаPYTS-9.prepare() в конце цепочки; используй await

Куда дальше

  • python/basics — PostgresContainer, async_engine, asyncio_mode, принципы интеграционного теста.
  • python/fixtures — session-scoped фикстуры, dependency_overrides для Clock и IdGenerator.
  • python/object-generator — что передавать в create_order(...).
  • python/one-test — полный пример теста с clear_all + prepare в Arrange-фазе.
  • Persistence — SQLAlchemy — структура таблиц и FK, которую повторяет препарер.