Опирается на правила:
PYTS-19,PYTS-20,PYTS-21,PYTS-22,PYTS-X1,PYTS-X4из Python Test Strategy → раздел 6. Kafka, Redis, async — по умолчанию НЕТ.
Важно знать
- Kafka не поднимаем в базовых интеграционных тестах — Testcontainers Kafka превращает тест в smoke и раздувает время с миллисекунд до минут.
- Redis не поднимаем — профиль
integration-testпереключает cache backend наnoneили in-memory.- Outbox-события проверяем напрямую через
DatabasePreparer— читаем изoutbox-таблицы в Testcontainers-PostgreSQL.- Idempotent consumer тестируем вызовом
await event_handler.handle(test_event)без брокера.- Outbox-relay переводим в синхрон: ручной
commit+ прямой вызов relay/handler, без фонового воркера.asyncio.sleep/polling/tenacityв тесте — запрещены; признак недетерминированного дизайна.- E2E с реальной Kafka — отдельная группа
@pytest.mark.e2e, отдельный CI-этап, не более 5–10 на сервис.
Когда заказ подтверждён, сервис публикует событие OrderConfirmed в Kafka и пишет результат в Redis-кеш. Интуитивный тест поднял бы Kafka-контейнер и проверил, что событие дошло до consumer. Это неправильно: такой тест занимает 30–60 секунд, нестабилен при перегрузке CI и тестирует инфраструктуру, а не бизнес-логику. UCP решает это иначе.
Почему Kafka и Redis остаются снаружи
Один интеграционный тест в UCP проверяет один сквозной сценарий от HTTP-запроса до строки в PostgreSQL. Всё, что выходит за пределы этого периметра — Kafka, Redis, внешние REST-сервисы — либо замокано, либо отключено профилем.
Граница принципиальная:
| Что тестируем | Инструмент |
|---|---|
| Бизнес-логика UseCase + хранение в PostgreSQL | httpx.AsyncClient + Testcontainers PG |
| Outbox-событие (факт записи) | DatabasePreparer — читаем outbox-таблицу |
| Idempotent consumer | прямой вызов handler.handle(event) |
| E2E через реальную Kafka | @pytest.mark.e2e, отдельный CI |
PYTS-X4: Testcontainers Kafka/Redis в базовом интеграционном тесте — антипаттерн.
Профиль integration-test
Kafka и Redis отключаются через settings-профиль. Пример на Pydantic Settings:
class Settings(BaseSettings):
cache_backend: str = "memory"
kafka_enabled: bool = True
model_config = SettingsConfigDict(
env_file=".env",
env_prefix="APP_",
)
def get_settings() -> Settings:
return Settings()
В тестовом conftest.py или через .env.test:
APP_KAFKA_ENABLED=false
APP_CACHE_BACKEND=none
И dependency_overrides на get_settings:
@pytest.fixture(scope="session", autouse=True)
def disable_kafka_and_cache(app: FastAPI) -> None:
test_settings = Settings(kafka_enabled=False, cache_backend="none")
app.dependency_overrides[get_settings] = lambda: test_settings
PYTS-20: Redis не поднимаем — cache backend переключён на none/in-memory.
Outbox вместо Kafka
PYTS-19: события — в Outbox-таблице, проверяем через DatabasePreparer.
Сервис заказов пишет событие не напрямую в Kafka, а в таблицу order_outbox. Отдельный relay-воркер читает её и публикует в Kafka. Тест проверяет, что после успешного подтверждения заказа строка в order_outbox появилась с правильным содержимым.
async def test_confirm_order_creates_outbox_event(
client: AsyncClient,
db_preparer: OrderDatabasePreparer,
async_session: AsyncSession,
) -> None:
order_id = uuid.UUID("11111111-1111-1111-1111-111111111111")
draft = (
OrderObjectGenerator()
.with_id(order_id)
.with_status(OrderStatus.DRAFT)
.build()
)
await db_preparer.create_order(draft).prepare()
response = await client.post(
f"/v1/orders/{order_id}/confirm",
headers=success_token(),
)
assert response.status_code == 200
rows = await async_session.execute(
select(OrderOutbox).where(OrderOutbox.aggregate_id == str(order_id))
)
events = rows.scalars().all()
assert len(events) == 1
assert events[0].event_type == "OrderConfirmed"
payload = json.loads(events[0].payload)
assert payload["order_id"] == str(order_id)
Что здесь:
- Kafka не поднята — нет контейнера, нет polling-ожидания.
- Тест завершается за 50–150 мс.
- Проверяется факт записи события и его содержимое, а не то, что Kafka приняла сообщение.
Idempotent consumer — напрямую
PYTS-21: тест idempotent consumer-а — вызов handler.handle(event) без брокера.
Kafka consumer — это класс с методом handle, который принимает десериализованное событие и выполняет бизнес-логику. Тест вызывает его напрямую как обычный объект.
@pytest.fixture()
def product_reserved_handler(async_session: AsyncSession) -> ProductReservedHandler:
return ProductReservedHandler(
order_repository=SqlAlchemyOrderRepository(async_session),
session=async_session,
)
async def test_handle_product_reserved_updates_order_status(
product_reserved_handler: ProductReservedHandler,
db_preparer: OrderDatabasePreparer,
async_session: AsyncSession,
) -> None:
order_id = uuid.UUID("22222222-2222-2222-2222-222222222222")
confirmed = (
OrderObjectGenerator()
.with_id(order_id)
.with_status(OrderStatus.CONFIRMED)
.build()
)
await db_preparer.create_order(confirmed).prepare()
event = ProductReservedEvent(
order_id=order_id,
reserved_at=datetime(2026, 6, 1, 12, 0, 0, tzinfo=UTC),
)
await product_reserved_handler.handle(event)
await async_session.commit()
result = await async_session.get(Order, order_id)
assert result is not None
assert result.status == OrderStatus.RESERVED
Второй вызов с тем же событием — проверка идемпотентности:
async def test_handle_product_reserved_is_idempotent(
product_reserved_handler: ProductReservedHandler,
db_preparer: OrderDatabasePreparer,
async_session: AsyncSession,
) -> None:
order_id = uuid.UUID("33333333-3333-3333-3333-333333333333")
confirmed = (
OrderObjectGenerator()
.with_id(order_id)
.with_status(OrderStatus.CONFIRMED)
.build()
)
await db_preparer.create_order(confirmed).prepare()
event = ProductReservedEvent(
order_id=order_id,
reserved_at=datetime(2026, 6, 1, 12, 0, 0, tzinfo=UTC),
)
await product_reserved_handler.handle(event)
await async_session.commit()
await product_reserved_handler.handle(event)
await async_session.commit()
result = await async_session.get(Order, order_id)
assert result is not None
assert result.status == OrderStatus.RESERVED
Kafka не нужна — handler принимает Python-объект, всё тестируется через PostgreSQL.
Outbox-relay — синхронный вызов
PYTS-22: relay переводим в синхрон — ручной commit + прямой вызов, без фонового воркера.
В продакшене outbox-relay — фоновая задача (asyncio task или отдельный процесс): читает order_outbox, публикует в Kafka, помечает записи как отправленные. В тесте воркер не запущен. Тестируем relay как функцию:
async def test_outbox_relay_marks_events_as_sent(
outbox_relay: OrderOutboxRelay,
db_preparer: OrderDatabasePreparer,
async_session: AsyncSession,
mock_kafka_producer: MockKafkaProducer,
) -> None:
order_id = uuid.UUID("44444444-4444-4444-4444-444444444444")
await db_preparer.create_outbox_event(
aggregate_id=str(order_id),
event_type="OrderConfirmed",
payload=json.dumps({"order_id": str(order_id)}),
).prepare()
await outbox_relay.run_once()
await async_session.commit()
result = await async_session.execute(
select(OrderOutbox).where(OrderOutbox.aggregate_id == str(order_id))
)
events = result.scalars().all()
assert len(events) == 1
assert events[0].sent_at is not None
assert mock_kafka_producer.published_count() == 1
MockKafkaProducer — это in-memory реализация порта, не Testcontainers Kafka. Тест проверяет, что relay вызвал producer.send(...) и обновил запись. Реальная Kafka не нужна.
asyncio.sleep запрещён
PYTS-X1: asyncio.sleep/while-poll/tenacity-ожидания в тесте — признак недетерминированного дизайна.
# ПЛОХО — polling с ожиданием
async def test_order_confirmed_event_published():
await client.post("/v1/orders/11111111/confirm", headers=success_token())
for _ in range(10):
await asyncio.sleep(0.5)
events = await fetch_kafka_events("order.confirmed")
if events:
break
assert len(events) == 1
Что плохо:
- Нестабильно — на перегруженном CI цикл исчерпывается раньше, чем Kafka consumer успевает обработать.
- Медленно — даже при успехе тест ждёт минимум одну итерацию
sleep. - Скрывает баги — событие с задержкой 200 мс пройдёт в тесте с
sleep(0.5), но в продакшене клиент уже получит ответ раньше, чем consumer обновит состояние downstream-сервиса.
Правильно: asyncio.sleep нет вообще. Если нужно проверить событие — смотрим в Outbox-таблицу (PYTS-19). Если нужно проверить consumer — вызываем handler.handle(event) (PYTS-21).
E2E с реальной Kafka — отдельный этап
PYTS-28: E2E через настоящую Kafka — @pytest.mark.e2e, отдельный CI-этап, не более 5–10 на сервис.
Когда E2E тест действительно нужен (например, проверить что событие прошло через Kafka и consumer обновил состояние в downstream-сервисе), он помечается маркером:
@pytest.mark.e2e
async def test_order_confirmed_event_reaches_customer_service():
...
В pytest.ini или pyproject.toml:
[tool.pytest.ini_options]
markers = [
"e2e: end-to-end tests with real Kafka and external services (deselect with '-m not e2e')",
]
CI-конфигурация запускает их в отдельном шаге:
- name: Integration tests
run: pytest -m "not e2e"
- name: E2E tests
run: pytest -m e2e
if: github.ref == 'refs/heads/main'
Базовый suite работает без Kafka-контейнера — быстро и стабильно на каждом PR. E2E — только на main или вручную.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Testcontainers Kafka в базовом интеграционном тесте | PYTS-X4 | Outbox-таблица + DatabasePreparer |
| Testcontainers Redis в тесте | PYTS-20 | cache_backend=none в профиле |
asyncio.sleep/while-poll в тесте | PYTS-X1 | синхронный flow, нет ожидания |
tenacity retry в тесте | PYTS-X1 | детерминированный handler-вызов |
| Kafka consumer тест через реальный брокер | PYTS-21 | await handler.handle(event) |
| Фоновый outbox-воркер в тесте | PYTS-22 | relay.run_once() + commit |
E2E-тест без маркера @pytest.mark.e2e | PYTS-28 | явная маркировка и отдельный CI |
Куда дальше
- python/basics.md — базовые правила, синхронность, AAA-структура.
- python/database-preparer.md — fluent setup БД для проверки Outbox.
- python/one-test.md — структура теста: имена,
success_token(),AsyncClient. - Внешние HTTP — мок —
pytest-httpserverиrespxвместо реальных сервисов.