Опирается на правила:
R-DIST-EC-1…R-DIST-EC-4иR-DIST-EC-X1…R-DIST-EC-X2из Distributed Patterns Style Guide → раздел 4. Eventual consistency.
Важно знать
- Eventual consistency — норма в распределённой системе, но требует явного контракта: клиент должен знать, что данные могут отставать.
- Декларация в FastAPI — для роутера, читающего eventual-consistent данные, задержка описывается в
descriptionоперации или модели (response_description).- Read-your-writes не работает само: нужен polling на стороне клиента, synchronous wait в хендлере или отдельный endpoint, читающий из write-side.
- Bounded staleness SLO — у каждой read-model явный лимит задержки («не более 5 секунд между write и появлением в проекции»), алерт в Prometheus если превышается.
- Causal consistency — реализуется через
version-поле агрегата; aiokafka-консьюмер применяет событие только еслиevent.version > current_version, иначе skip.- Молчаливая EC — главный антипаттерн: клиент делает write, сразу читает, получает stale data — доверие к API падает.
- Strict immediate consistency через 2PC не масштабируется; при высокой нагрузке — либо EC, либо redesign boundary.
В распределённой системе теорема CAP: нельзя одновременно иметь строгую согласованность, доступность и устойчивость к разделению. UCP выбирает доступность и устойчивость — согласованность у нас eventual. Это решение требует явного оформления, не молчаливого допущения.
Декларация в API
R-DIST-EC-1: для роутера, читающего eventual-consistent данные, ожидаемая задержка указывается в OpenAPI-описании через description или response_description.
from fastapi import APIRouter
from app.schemas import OrderSummaryListResponse
router = APIRouter()
@router.get(
"/customers/{customer_id}/orders",
response_model=OrderSummaryListResponse,
summary="Получить список заказов клиента",
description="""
Возвращает summary заказов клиента из денормализованной read-проекции.
**Eventual consistency**: задержка от write в order-service до появления
в этом endpoint обычно < 1 секунды (p99 < 5 секунд). Если нужна
immediate consistency сразу после создания заказа — использовать
`GET /orders/{order_id}`, который читает write-side.
""",
response_description="Список заказов (могут отставать на 1-5 секунд)",
)
async def get_customer_orders(customer_id: str) -> OrderSummaryListResponse:
...
Без этой декларации клиентский разработчик напишет тест POST /orders + сразу GET /customers/{id}/orders и получит пустой список — и будет считать это багом.
Read-your-writes — три способа
R-DIST-EC-2: «клиент после своего write сразу читает свой результат» — отдельная задача с несколькими решениями.
1. Polling на стороне клиента
Клиент получает order_id из POST /orders, затем polling-ом обращается к read-проекции до появления записи:
import asyncio
import httpx
async def create_order_with_read_your_writes(
client: httpx.AsyncClient,
payload: dict,
) -> dict:
response = await client.post("/orders", json=payload)
response.raise_for_status()
order_id = response.json()["order_id"]
for _ in range(20):
await asyncio.sleep(0.2)
r = await client.get(f"/customers/me/orders/{order_id}")
if r.status_code == 200:
return r.json()
raise TimeoutError(f"Read-model did not reflect order {order_id} in time")
Простой подход; клиент платит latency. Применим для UI с прогресс-индикатором.
2. Synchronous wait в хендлере
Хендлер команды после write выполняет короткий polling read-model на стороне сервера и возвращает актуальный summary без перекладывания polling-логики на клиента:
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from app.repositories import OrderRepository, OrderProjectionRepository
from app.schemas import OrderCreateCommand, OrderSummaryResponse
class CreateOrderHandler:
def __init__(
self,
session: AsyncSession,
orders: OrderRepository,
projections: OrderProjectionRepository,
) -> None:
self._session = session
self._orders = orders
self._projections = projections
async def handle(self, cmd: OrderCreateCommand) -> OrderSummaryResponse:
order = await self._orders.save(cmd.to_domain())
await self._session.commit()
for _ in range(10):
await asyncio.sleep(0.2)
projection = await self._projections.find_by_order_id(order.id)
if projection is not None:
return OrderSummaryResponse.from_projection(projection)
return OrderSummaryResponse.from_write_side(order)
FastAPI-роутер возвращает то, что вернул хендлер — клиент получает либо проекцию, либо fallback из write-side. Подходит для критичных flow «после оформления заказа показать summary».
3. Отдельный endpoint из write-side
Для случаев read-your-writes — специальный endpoint, читающий из write-Repository (полный агрегат), не из read-проекции:
POST /orders → CreateOrderHandler (write)
GET /orders/{order_id} → write-side, immediate consistency
GET /customers/{id}/orders → read-projection, eventual consistency
@router.get(
"/orders/{order_id}",
response_model=OrderDetailResponse,
description="Читает из write-side. Immediate consistency. Используйте сразу после POST /orders.",
)
async def get_order_write_side(
order_id: str,
orders: OrderRepository = Depends(get_order_repository),
) -> OrderDetailResponse:
order = await orders.find_by_id(order_id)
if order is None:
raise HTTPException(status_code=404)
return OrderDetailResponse.from_domain(order)
Read-проекция держит нагрузку на listing-запросы; immediate-endpoint обслуживает «только что после write».
Bounded staleness SLO
R-DIST-EC-3: у каждой read-model явный SLO на максимальную задержку: «p99 задержки между commit в write-side и появлением в read-проекции — не более 5 секунд». Измеряем через метрику.
from datetime import datetime, timezone
from prometheus_client import Histogram
from app.schemas import OrderCreatedEvent
READ_MODEL_STALENESS = Histogram(
"read_model_staleness_seconds",
"Задержка между write-side и появлением в read-проекции",
labelnames=["model", "event_type"],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0],
)
class OrderProjectionConsumer:
async def handle(self, event: OrderCreatedEvent) -> None:
staleness = (
datetime.now(tz=timezone.utc) - event.occurred_at
).total_seconds()
READ_MODEL_STALENESS.labels(
model="order_summary",
event_type=event.event_type,
).observe(staleness)
await self._projections.upsert(self._to_projection(event))
Алерт в Prometheus:
- alert: ReadModelStalenessHigh
expr: >
histogram_quantile(
0.99,
sum by (le, model) (rate(read_model_staleness_seconds_bucket[5m]))
) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "Read-model {{ $labels.model }} отстаёт больше 5 секунд (p99)"
Без SLO read-model может тихо отстать на десятки минут — никто не заметит до жалоб клиентов.
Causal consistency через version
R-DIST-EC-4: когда порядок событий важен, aiokafka-консьюмер проверяет монотонность version-поля и пропускает out-of-order события.
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from app.repositories import OrderProjectionRepository
class OrderUpdatedEvent(BaseModel):
order_id: str
aggregate_version: int
status: str
total_amount: int
occurred_at: datetime
class OrderProjectionConsumer:
def __init__(
self,
session: AsyncSession,
projections: OrderProjectionRepository,
) -> None:
self._session = session
self._projections = projections
async def handle(self, event: OrderUpdatedEvent) -> None:
current = await self._projections.find_by_order_id(event.order_id)
if current is not None and event.aggregate_version <= current.version:
return
await self._projections.upsert(
order_id=event.order_id,
version=event.aggregate_version,
status=event.status,
total_amount=event.total_amount,
)
await self._session.commit()
aggregate_version — монотонно растущий счётчик, увеличивается при каждом изменении агрегата в write-side. При rebalance или retry aiokafka может доставить события не в исходном порядке; version-check гарантирует, что финальное состояние проекции корректное.
Регистрация консьюмера через aiokafka:
from aiokafka import AIOKafkaConsumer
import orjson
async def run_order_projection_consumer(consumer: OrderProjectionConsumer) -> None:
kafka = AIOKafkaConsumer(
"order.events",
bootstrap_servers="kafka:9092",
group_id="order-projection",
enable_auto_commit=False,
)
await kafka.start()
try:
async for msg in kafka:
event = OrderUpdatedEvent.model_validate(orjson.loads(msg.value))
await consumer.handle(event)
await kafka.commit()
finally:
await kafka.stop()
Vector clocks нужны редко — скалярного aggregate_version на агрегат достаточно для большинства сценариев.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Endpoint возвращает stale data без упоминания EC | R-DIST-EC-X1 | description с ожидаемой задержкой в FastAPI-роутере |
| 2PC/XA для immediate consistency | R-DIST-EC-X2 | redesign boundary или принять EC |
| Read-model без явного SLO | R-DIST-EC-3 | bounded staleness + Prometheus alert |
| Out-of-order события применяются без проверки | R-DIST-EC-4 | version-check перед upsert в консьюмере |
| Polling без timeout и fallback | R-DIST-EC-2 | timeout + OrderSummaryResponse.from_write_side(order) |
| Read-your-writes polling на каждый GET | R-DIST-EC-2 | отдельный /orders/{id} из write-side только для сценария «после write» |
Куда дальше
- Outbox + Inbox — главный механизм синхронизации в eventual-consistent системе.
- Idempotency — aiokafka-консьюмер должен быть идемпотентным при retry.
- Saga —
saga_idсквозной; in-flight саги — отдельный случай eventual consistency. - Compensation — семантический откат вместо технического; оставляет audit trail.
- Distributed transactions — почему 2PC/XA не вариант в Python-стеке.
- Когда нужны распределённые паттерны — проверь альтернативы перед введением EC.