Опирается на правила: R-DIST-EC-1R-DIST-EC-4 и R-DIST-EC-X1R-DIST-EC-X2 из Distributed Patterns Style Guide → раздел 4. Eventual consistency.

Важно знать

  • Eventual consistency — норма в распределённой системе, но требует явного контракта: клиент должен знать, что данные могут отставать.
  • Декларация в FastAPI — для роутера, читающего eventual-consistent данные, задержка описывается в description операции или модели (response_description).
  • Read-your-writes не работает само: нужен polling на стороне клиента, synchronous wait в хендлере или отдельный endpoint, читающий из write-side.
  • Bounded staleness SLO — у каждой read-model явный лимит задержки («не более 5 секунд между write и появлением в проекции»), алерт в Prometheus если превышается.
  • Causal consistency — реализуется через version-поле агрегата; aiokafka-консьюмер применяет событие только если event.version > current_version, иначе skip.
  • Молчаливая EC — главный антипаттерн: клиент делает write, сразу читает, получает stale data — доверие к API падает.
  • Strict immediate consistency через 2PC не масштабируется; при высокой нагрузке — либо EC, либо redesign boundary.

В распределённой системе теорема CAP: нельзя одновременно иметь строгую согласованность, доступность и устойчивость к разделению. UCP выбирает доступность и устойчивость — согласованность у нас eventual. Это решение требует явного оформления, не молчаливого допущения.

Декларация в API

R-DIST-EC-1: для роутера, читающего eventual-consistent данные, ожидаемая задержка указывается в OpenAPI-описании через description или response_description.

from fastapi import APIRouter
from app.schemas import OrderSummaryListResponse

router = APIRouter()

@router.get(
    "/customers/{customer_id}/orders",
    response_model=OrderSummaryListResponse,
    summary="Получить список заказов клиента",
    description="""
Возвращает summary заказов клиента из денормализованной read-проекции.

**Eventual consistency**: задержка от write в order-service до появления
в этом endpoint обычно < 1 секунды (p99 < 5 секунд). Если нужна
immediate consistency сразу после создания заказа — использовать
`GET /orders/{order_id}`, который читает write-side.
""",
    response_description="Список заказов (могут отставать на 1-5 секунд)",
)
async def get_customer_orders(customer_id: str) -> OrderSummaryListResponse:
    ...

Без этой декларации клиентский разработчик напишет тест POST /orders + сразу GET /customers/{id}/orders и получит пустой список — и будет считать это багом.

Read-your-writes — три способа

R-DIST-EC-2: «клиент после своего write сразу читает свой результат» — отдельная задача с несколькими решениями.

1. Polling на стороне клиента

Клиент получает order_id из POST /orders, затем polling-ом обращается к read-проекции до появления записи:

import asyncio
import httpx

async def create_order_with_read_your_writes(
    client: httpx.AsyncClient,
    payload: dict,
) -> dict:
    response = await client.post("/orders", json=payload)
    response.raise_for_status()
    order_id = response.json()["order_id"]

    for _ in range(20):
        await asyncio.sleep(0.2)
        r = await client.get(f"/customers/me/orders/{order_id}")
        if r.status_code == 200:
            return r.json()

    raise TimeoutError(f"Read-model did not reflect order {order_id} in time")

Простой подход; клиент платит latency. Применим для UI с прогресс-индикатором.

2. Synchronous wait в хендлере

Хендлер команды после write выполняет короткий polling read-model на стороне сервера и возвращает актуальный summary без перекладывания polling-логики на клиента:

import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from app.repositories import OrderRepository, OrderProjectionRepository
from app.schemas import OrderCreateCommand, OrderSummaryResponse

class CreateOrderHandler:
    def __init__(
        self,
        session: AsyncSession,
        orders: OrderRepository,
        projections: OrderProjectionRepository,
    ) -> None:
        self._session = session
        self._orders = orders
        self._projections = projections

    async def handle(self, cmd: OrderCreateCommand) -> OrderSummaryResponse:
        order = await self._orders.save(cmd.to_domain())
        await self._session.commit()

        for _ in range(10):
            await asyncio.sleep(0.2)
            projection = await self._projections.find_by_order_id(order.id)
            if projection is not None:
                return OrderSummaryResponse.from_projection(projection)

        return OrderSummaryResponse.from_write_side(order)

FastAPI-роутер возвращает то, что вернул хендлер — клиент получает либо проекцию, либо fallback из write-side. Подходит для критичных flow «после оформления заказа показать summary».

3. Отдельный endpoint из write-side

Для случаев read-your-writes — специальный endpoint, читающий из write-Repository (полный агрегат), не из read-проекции:

POST /orders                          → CreateOrderHandler (write)
GET  /orders/{order_id}               → write-side, immediate consistency
GET  /customers/{id}/orders           → read-projection, eventual consistency
@router.get(
    "/orders/{order_id}",
    response_model=OrderDetailResponse,
    description="Читает из write-side. Immediate consistency. Используйте сразу после POST /orders.",
)
async def get_order_write_side(
    order_id: str,
    orders: OrderRepository = Depends(get_order_repository),
) -> OrderDetailResponse:
    order = await orders.find_by_id(order_id)
    if order is None:
        raise HTTPException(status_code=404)
    return OrderDetailResponse.from_domain(order)

Read-проекция держит нагрузку на listing-запросы; immediate-endpoint обслуживает «только что после write».

Bounded staleness SLO

R-DIST-EC-3: у каждой read-model явный SLO на максимальную задержку: «p99 задержки между commit в write-side и появлением в read-проекции — не более 5 секунд». Измеряем через метрику.

from datetime import datetime, timezone
from prometheus_client import Histogram
from app.schemas import OrderCreatedEvent

READ_MODEL_STALENESS = Histogram(
    "read_model_staleness_seconds",
    "Задержка между write-side и появлением в read-проекции",
    labelnames=["model", "event_type"],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0],
)

class OrderProjectionConsumer:
    async def handle(self, event: OrderCreatedEvent) -> None:
        staleness = (
            datetime.now(tz=timezone.utc) - event.occurred_at
        ).total_seconds()
        READ_MODEL_STALENESS.labels(
            model="order_summary",
            event_type=event.event_type,
        ).observe(staleness)

        await self._projections.upsert(self._to_projection(event))

Алерт в Prometheus:

- alert: ReadModelStalenessHigh
  expr: >
    histogram_quantile(
      0.99,
      sum by (le, model) (rate(read_model_staleness_seconds_bucket[5m]))
    ) > 5
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Read-model {{ $labels.model }} отстаёт больше 5 секунд (p99)"

Без SLO read-model может тихо отстать на десятки минут — никто не заметит до жалоб клиентов.

Causal consistency через version

R-DIST-EC-4: когда порядок событий важен, aiokafka-консьюмер проверяет монотонность version-поля и пропускает out-of-order события.

from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from app.repositories import OrderProjectionRepository

class OrderUpdatedEvent(BaseModel):
    order_id: str
    aggregate_version: int
    status: str
    total_amount: int
    occurred_at: datetime

class OrderProjectionConsumer:
    def __init__(
        self,
        session: AsyncSession,
        projections: OrderProjectionRepository,
    ) -> None:
        self._session = session
        self._projections = projections

    async def handle(self, event: OrderUpdatedEvent) -> None:
        current = await self._projections.find_by_order_id(event.order_id)

        if current is not None and event.aggregate_version <= current.version:
            return

        await self._projections.upsert(
            order_id=event.order_id,
            version=event.aggregate_version,
            status=event.status,
            total_amount=event.total_amount,
        )
        await self._session.commit()

aggregate_version — монотонно растущий счётчик, увеличивается при каждом изменении агрегата в write-side. При rebalance или retry aiokafka может доставить события не в исходном порядке; version-check гарантирует, что финальное состояние проекции корректное.

Регистрация консьюмера через aiokafka:

from aiokafka import AIOKafkaConsumer
import orjson

async def run_order_projection_consumer(consumer: OrderProjectionConsumer) -> None:
    kafka = AIOKafkaConsumer(
        "order.events",
        bootstrap_servers="kafka:9092",
        group_id="order-projection",
        enable_auto_commit=False,
    )
    await kafka.start()
    try:
        async for msg in kafka:
            event = OrderUpdatedEvent.model_validate(orjson.loads(msg.value))
            await consumer.handle(event)
            await kafka.commit()
    finally:
        await kafka.stop()

Vector clocks нужны редко — скалярного aggregate_version на агрегат достаточно для большинства сценариев.

Что запрещено

АнтипаттернПравилоЧто взамен
Endpoint возвращает stale data без упоминания ECR-DIST-EC-X1description с ожидаемой задержкой в FastAPI-роутере
2PC/XA для immediate consistencyR-DIST-EC-X2redesign boundary или принять EC
Read-model без явного SLOR-DIST-EC-3bounded staleness + Prometheus alert
Out-of-order события применяются без проверкиR-DIST-EC-4version-check перед upsert в консьюмере
Polling без timeout и fallbackR-DIST-EC-2timeout + OrderSummaryResponse.from_write_side(order)
Read-your-writes polling на каждый GETR-DIST-EC-2отдельный /orders/{id} из write-side только для сценария «после write»

Куда дальше

  • Outbox + Inbox — главный механизм синхронизации в eventual-consistent системе.
  • Idempotency — aiokafka-консьюмер должен быть идемпотентным при retry.
  • Saga — saga_id сквозной; in-flight саги — отдельный случай eventual consistency.
  • Compensation — семантический откат вместо технического; оставляет audit trail.
  • Distributed transactions — почему 2PC/XA не вариант в Python-стеке.
  • Когда нужны распределённые паттерны — проверь альтернативы перед введением EC.