Опирается на правила: PYTS-19, PYTS-20, PYTS-21, PYTS-22, PYTS-X1, PYTS-X4 из Python Test Strategy → раздел 6. Kafka, Redis, async — по умолчанию НЕТ.

Важно знать

  • Kafka не поднимаем в базовых интеграционных тестах — Testcontainers Kafka превращает тест в smoke и раздувает время с миллисекунд до минут.
  • Redis не поднимаем — профиль integration-test переключает cache backend на none или in-memory.
  • Outbox-события проверяем напрямую через DatabasePreparer — читаем из outbox-таблицы в Testcontainers-PostgreSQL.
  • Idempotent consumer тестируем вызовом await event_handler.handle(test_event) без брокера.
  • Outbox-relay переводим в синхрон: ручной commit + прямой вызов relay/handler, без фонового воркера.
  • asyncio.sleep/polling/tenacity в тесте — запрещены; признак недетерминированного дизайна.
  • E2E с реальной Kafka — отдельная группа @pytest.mark.e2e, отдельный CI-этап, не более 5–10 на сервис.

Когда заказ подтверждён, сервис публикует событие OrderConfirmed в Kafka и пишет результат в Redis-кеш. Интуитивный тест поднял бы Kafka-контейнер и проверил, что событие дошло до consumer. Это неправильно: такой тест занимает 30–60 секунд, нестабилен при перегрузке CI и тестирует инфраструктуру, а не бизнес-логику. UCP решает это иначе.

Почему Kafka и Redis остаются снаружи

Один интеграционный тест в UCP проверяет один сквозной сценарий от HTTP-запроса до строки в PostgreSQL. Всё, что выходит за пределы этого периметра — Kafka, Redis, внешние REST-сервисы — либо замокано, либо отключено профилем.

Граница принципиальная:

Что тестируемИнструмент
Бизнес-логика UseCase + хранение в PostgreSQLhttpx.AsyncClient + Testcontainers PG
Outbox-событие (факт записи)DatabasePreparer — читаем outbox-таблицу
Idempotent consumerпрямой вызов handler.handle(event)
E2E через реальную Kafka@pytest.mark.e2e, отдельный CI

PYTS-X4: Testcontainers Kafka/Redis в базовом интеграционном тесте — антипаттерн.

Профиль integration-test

Kafka и Redis отключаются через settings-профиль. Пример на Pydantic Settings:

class Settings(BaseSettings):
    cache_backend: str = "memory"
    kafka_enabled: bool = True

    model_config = SettingsConfigDict(
        env_file=".env",
        env_prefix="APP_",
    )


def get_settings() -> Settings:
    return Settings()

В тестовом conftest.py или через .env.test:

APP_KAFKA_ENABLED=false
APP_CACHE_BACKEND=none

И dependency_overrides на get_settings:

@pytest.fixture(scope="session", autouse=True)
def disable_kafka_and_cache(app: FastAPI) -> None:
    test_settings = Settings(kafka_enabled=False, cache_backend="none")
    app.dependency_overrides[get_settings] = lambda: test_settings

PYTS-20: Redis не поднимаем — cache backend переключён на none/in-memory.

Outbox вместо Kafka

PYTS-19: события — в Outbox-таблице, проверяем через DatabasePreparer.

Сервис заказов пишет событие не напрямую в Kafka, а в таблицу order_outbox. Отдельный relay-воркер читает её и публикует в Kafka. Тест проверяет, что после успешного подтверждения заказа строка в order_outbox появилась с правильным содержимым.

async def test_confirm_order_creates_outbox_event(
    client: AsyncClient,
    db_preparer: OrderDatabasePreparer,
    async_session: AsyncSession,
) -> None:
    order_id = uuid.UUID("11111111-1111-1111-1111-111111111111")
    draft = (
        OrderObjectGenerator()
        .with_id(order_id)
        .with_status(OrderStatus.DRAFT)
        .build()
    )
    await db_preparer.create_order(draft).prepare()

    response = await client.post(
        f"/v1/orders/{order_id}/confirm",
        headers=success_token(),
    )

    assert response.status_code == 200

    rows = await async_session.execute(
        select(OrderOutbox).where(OrderOutbox.aggregate_id == str(order_id))
    )
    events = rows.scalars().all()
    assert len(events) == 1
    assert events[0].event_type == "OrderConfirmed"
    payload = json.loads(events[0].payload)
    assert payload["order_id"] == str(order_id)

Что здесь:

  • Kafka не поднята — нет контейнера, нет polling-ожидания.
  • Тест завершается за 50–150 мс.
  • Проверяется факт записи события и его содержимое, а не то, что Kafka приняла сообщение.

Idempotent consumer — напрямую

PYTS-21: тест idempotent consumer-а — вызов handler.handle(event) без брокера.

Kafka consumer — это класс с методом handle, который принимает десериализованное событие и выполняет бизнес-логику. Тест вызывает его напрямую как обычный объект.

@pytest.fixture()
def product_reserved_handler(async_session: AsyncSession) -> ProductReservedHandler:
    return ProductReservedHandler(
        order_repository=SqlAlchemyOrderRepository(async_session),
        session=async_session,
    )


async def test_handle_product_reserved_updates_order_status(
    product_reserved_handler: ProductReservedHandler,
    db_preparer: OrderDatabasePreparer,
    async_session: AsyncSession,
) -> None:
    order_id = uuid.UUID("22222222-2222-2222-2222-222222222222")
    confirmed = (
        OrderObjectGenerator()
        .with_id(order_id)
        .with_status(OrderStatus.CONFIRMED)
        .build()
    )
    await db_preparer.create_order(confirmed).prepare()

    event = ProductReservedEvent(
        order_id=order_id,
        reserved_at=datetime(2026, 6, 1, 12, 0, 0, tzinfo=UTC),
    )

    await product_reserved_handler.handle(event)
    await async_session.commit()

    result = await async_session.get(Order, order_id)
    assert result is not None
    assert result.status == OrderStatus.RESERVED

Второй вызов с тем же событием — проверка идемпотентности:

async def test_handle_product_reserved_is_idempotent(
    product_reserved_handler: ProductReservedHandler,
    db_preparer: OrderDatabasePreparer,
    async_session: AsyncSession,
) -> None:
    order_id = uuid.UUID("33333333-3333-3333-3333-333333333333")
    confirmed = (
        OrderObjectGenerator()
        .with_id(order_id)
        .with_status(OrderStatus.CONFIRMED)
        .build()
    )
    await db_preparer.create_order(confirmed).prepare()

    event = ProductReservedEvent(
        order_id=order_id,
        reserved_at=datetime(2026, 6, 1, 12, 0, 0, tzinfo=UTC),
    )

    await product_reserved_handler.handle(event)
    await async_session.commit()

    await product_reserved_handler.handle(event)
    await async_session.commit()

    result = await async_session.get(Order, order_id)
    assert result is not None
    assert result.status == OrderStatus.RESERVED

Kafka не нужна — handler принимает Python-объект, всё тестируется через PostgreSQL.

Outbox-relay — синхронный вызов

PYTS-22: relay переводим в синхрон — ручной commit + прямой вызов, без фонового воркера.

В продакшене outbox-relay — фоновая задача (asyncio task или отдельный процесс): читает order_outbox, публикует в Kafka, помечает записи как отправленные. В тесте воркер не запущен. Тестируем relay как функцию:

async def test_outbox_relay_marks_events_as_sent(
    outbox_relay: OrderOutboxRelay,
    db_preparer: OrderDatabasePreparer,
    async_session: AsyncSession,
    mock_kafka_producer: MockKafkaProducer,
) -> None:
    order_id = uuid.UUID("44444444-4444-4444-4444-444444444444")
    await db_preparer.create_outbox_event(
        aggregate_id=str(order_id),
        event_type="OrderConfirmed",
        payload=json.dumps({"order_id": str(order_id)}),
    ).prepare()

    await outbox_relay.run_once()
    await async_session.commit()

    result = await async_session.execute(
        select(OrderOutbox).where(OrderOutbox.aggregate_id == str(order_id))
    )
    events = result.scalars().all()
    assert len(events) == 1
    assert events[0].sent_at is not None
    assert mock_kafka_producer.published_count() == 1

MockKafkaProducer — это in-memory реализация порта, не Testcontainers Kafka. Тест проверяет, что relay вызвал producer.send(...) и обновил запись. Реальная Kafka не нужна.

asyncio.sleep запрещён

PYTS-X1: asyncio.sleep/while-poll/tenacity-ожидания в тесте — признак недетерминированного дизайна.

# ПЛОХО — polling с ожиданием
async def test_order_confirmed_event_published():
    await client.post("/v1/orders/11111111/confirm", headers=success_token())

    for _ in range(10):
        await asyncio.sleep(0.5)
        events = await fetch_kafka_events("order.confirmed")
        if events:
            break

    assert len(events) == 1

Что плохо:

  • Нестабильно — на перегруженном CI цикл исчерпывается раньше, чем Kafka consumer успевает обработать.
  • Медленно — даже при успехе тест ждёт минимум одну итерацию sleep.
  • Скрывает баги — событие с задержкой 200 мс пройдёт в тесте с sleep(0.5), но в продакшене клиент уже получит ответ раньше, чем consumer обновит состояние downstream-сервиса.

Правильно: asyncio.sleep нет вообще. Если нужно проверить событие — смотрим в Outbox-таблицу (PYTS-19). Если нужно проверить consumer — вызываем handler.handle(event) (PYTS-21).

E2E с реальной Kafka — отдельный этап

PYTS-28: E2E через настоящую Kafka — @pytest.mark.e2e, отдельный CI-этап, не более 5–10 на сервис.

Когда E2E тест действительно нужен (например, проверить что событие прошло через Kafka и consumer обновил состояние в downstream-сервисе), он помечается маркером:

@pytest.mark.e2e
async def test_order_confirmed_event_reaches_customer_service():
    ...

В pytest.ini или pyproject.toml:

[tool.pytest.ini_options]
markers = [
    "e2e: end-to-end tests with real Kafka and external services (deselect with '-m not e2e')",
]

CI-конфигурация запускает их в отдельном шаге:

- name: Integration tests
  run: pytest -m "not e2e"

- name: E2E tests
  run: pytest -m e2e
  if: github.ref == 'refs/heads/main'

Базовый suite работает без Kafka-контейнера — быстро и стабильно на каждом PR. E2E — только на main или вручную.

Что запрещено

АнтипаттернПравилоЧто взамен
Testcontainers Kafka в базовом интеграционном тестеPYTS-X4Outbox-таблица + DatabasePreparer
Testcontainers Redis в тестеPYTS-20cache_backend=none в профиле
asyncio.sleep/while-poll в тестеPYTS-X1синхронный flow, нет ожидания
tenacity retry в тестеPYTS-X1детерминированный handler-вызов
Kafka consumer тест через реальный брокерPYTS-21await handler.handle(event)
Фоновый outbox-воркер в тестеPYTS-22relay.run_once() + commit
E2E-тест без маркера @pytest.mark.e2ePYTS-28явная маркировка и отдельный CI

Куда дальше

  • python/basics.md — базовые правила, синхронность, AAA-структура.
  • python/database-preparer.md — fluent setup БД для проверки Outbox.
  • python/one-test.md — структура теста: имена, success_token(), AsyncClient.
  • Внешние HTTP — мок — pytest-httpserver и respx вместо реальных сервисов.