Опирается на правила:
R-CQRS-RM-1…R-CQRS-RM-4иR-CQRS-RM-X1…R-CQRS-RM-X3из CQRS Style Guide → раздел 4. Read-model.
Важно знать
- Read-model — данные в форме, удобной для чтения: денормализованные, pre-aggregated, без join'ов на горячем пути.
- Хранилище выбирается под паттерн чтения: PG-таблица, Redis, ElasticSearch — не одно «универсальное».
- Schema read-model независима от write-схемы: один атрибут может присутствовать в нескольких read-DTO под разными именами.
- Читается через
<X>ViewRepository— rawtext()select через read-onlyAsyncSessionбез commit (R-SQLA-QRY-5).- Обновляется через события (outbox → Kafka → consumer), не синхронно в write-транзакции (
R-CQRS-RM-3).- Read-model восстановима из write-side: при потере — rebuilder проходит по агрегатам и заново строит проекцию (
R-CQRS-RM-4).- Source of truth — write-side агрегаты. Read-model — производная.
- Никакой бизнес-логики в read-model (CHECK-инварианты, триггеры). Никакого bidirectional sync (
R-CQRS-RM-X1,R-CQRS-RM-X3).
Read-model — денормализованное представление, в котором данные уже сложены так, как их хочет конечный потребитель: один SELECT без join'ов возвращает готовый объект для UI / API. Цена — eventual consistency и дополнительная инфраструктура; выгода — порядки разницы в latency и пропускной способности. Статья раскрывает раздел 4 CQRS Style Guide в идиомах FastAPI / SQLAlchemy.
Где хранить read-model
R-CQRS-RM-1: выбор хранилища — функция паттерна чтения, не предпочтений команды.
| Паттерн чтения | Хранилище | Почему |
|---|---|---|
| Tabular query с пагинацией, фильтром, сортировкой | Денормализованная PG-таблица | Реляционка хорошо умеет такой workload, sync через outbox |
| Тяжёлые aggregations (GROUP BY миллионов строк) | PG materialized view | Pre-computed, refresh по расписанию или событию |
| Key-lookup hot-keys (по ID, по короткому ключу) | Redis | Sub-millisecond latency, горизонтальное масштабирование |
| Full-text search, multi-field фильтры с relevance | ElasticSearch / OpenSearch | Inverted index, ranking, faceted search |
PG-таблица — дефолт
Денормализованная таблица в той же СУБД — почти всегда первый шаг. Никакой новой инфраструктуры, синхронизация через outbox.
CREATE TABLE order_summary (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
customer_name TEXT NOT NULL,
customer_email TEXT NOT NULL,
status TEXT NOT NULL,
item_count INTEGER NOT NULL,
total_amount NUMERIC(19,4) NOT NULL,
currency TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
confirmed_at TIMESTAMPTZ,
shipped_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL,
version BIGINT NOT NULL DEFAULT 0
);
CREATE INDEX ix_os_customer ON order_summary (customer_id, created_at DESC);
CREATE INDEX ix_os_status_date ON order_summary (status, created_at DESC);
Поле version — для idempotent UPDATE в consumer'е (см. Sync через события).
PG materialized view — для тяжёлых aggregations
Сводка вроде «оборот по продуктам за месяц» пересчитывается редко, хранится заранее:
CREATE MATERIALIZED VIEW product_revenue_daily AS
SELECT
p.product_id,
p.name,
DATE(oi.created_at) AS day,
SUM(oi.quantity * oi.unit_price) AS revenue,
COUNT(DISTINCT o.id) AS order_count
FROM order_item oi
JOIN product p ON p.product_id = oi.product_id
JOIN "order" o ON o.id = oi.order_id
WHERE o.status IN ('CONFIRMED', 'SHIPPED', 'DELIVERED')
GROUP BY p.product_id, p.name, DATE(oi.created_at);
CREATE UNIQUE INDEX ux_prd_pk ON product_revenue_daily (product_id, day);
Refresh — REFRESH MATERIALIZED VIEW CONCURRENTLY по расписанию (APScheduler или Celery Beat) или по событию OrderConfirmed.
Redis — для hot-key lookup
Проекция Customer → ActiveSubscriptionPlan — читается на каждый запрос пользователя:
# adapters/out/persistence/redis_subscription_view_repository.py
from dataclasses import dataclass
from redis.asyncio import Redis
@dataclass(frozen=True)
class SubscriptionPlan:
plan: str
expires_at: str
class RedisSubscriptionViewRepository:
def __init__(self, redis: Redis) -> None:
self._redis = redis
async def find_by_customer(self, customer_id: int) -> SubscriptionPlan | None:
raw = await self._redis.get(f"customer:{customer_id}:plan")
if raw is None:
return None
data = json.loads(raw)
return SubscriptionPlan(plan=data["plan"], expires_at=data["expires_at"])
async def upsert(self, customer_id: int, plan: SubscriptionPlan) -> None:
await self._redis.set(
f"customer:{customer_id}:plan",
json.dumps({"plan": plan.plan, "expires_at": plan.expires_at}),
ex=3600,
)
Consumer на SubscriptionUpdated вызывает upsert. Разница с обычным кешем: read-model в Redis — источник ответа, не fallback.
ElasticSearch — для full-text search
Поиск по описаниям Product, фильтры по 20+ атрибутам с relevance:
# adapters/out/search/es_product_view_repository.py
from dataclasses import dataclass
from elasticsearch import AsyncElasticsearch
@dataclass(frozen=True)
class ProductSearchResult:
product_id: int
name: str
price: int
in_stock: bool
rating: float
class ElasticsearchProductViewRepository:
def __init__(self, es: AsyncElasticsearch) -> None:
self._es = es
async def search(
self,
q: str,
min_rating: float | None = None,
in_stock: bool | None = None,
) -> list[ProductSearchResult]:
must = [{"match": {"name": q}}]
filters = []
if min_rating is not None:
filters.append({"range": {"rating": {"gte": min_rating}}})
if in_stock is not None:
filters.append({"term": {"in_stock": in_stock}})
resp = await self._es.search(
index="products",
query={"bool": {"must": must, "filter": filters}},
)
return [_to_result(hit["_source"]) for hit in resp["hits"]["hits"]]
Consumer'ы на ProductCreated, ProductPriceChanged, StockUpdated обновляют документ в индексе.
Schema независимая от write-стороны
R-CQRS-RM-2: read-схема и read-DTO продиктованы потребителем, не агрегатом.
write-схема: read-схема (order_summary):
order(id, customer_id, status) order_summary(
order_item(order_id, qty, price) order_id,
customer(id, name, email) customer_name, ← денормализовано из customer
customer_email, ← денормализовано из customer
status,
item_count ← pre-computed из order_item
)
В Python read-DTO — @dataclass(frozen=True) или Pydantic-модель (model_config = ConfigDict(frozen=True)):
# core/order/port/out/view/order_summary_view.py
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
@dataclass(frozen=True)
class OrderSummaryView:
order_id: int
customer_id: int
customer_name: str
customer_email: str
status: str
item_count: int
total_amount: Decimal
currency: str
created_at: datetime
confirmed_at: datetime | None
def to_order_summary_view(row: dict) -> OrderSummaryView:
return OrderSummaryView(
order_id=int(row["order_id"]),
customer_id=int(row["customer_id"]),
customer_name=str(row["customer_name"]),
customer_email=str(row["customer_email"]),
status=str(row["status"]),
item_count=int(row["item_count"]),
total_amount=Decimal(str(row["total_amount"])),
currency=str(row["currency"]),
created_at=row["created_at"],
confirmed_at=row.get("confirmed_at"),
)
ViewRepository — raw select через read-only сессию
R-CQRS-QRY-2, R-CQRS-TIER-3: read-side использует отдельный <X>ViewRepository, который читает через raw text() select в read-only AsyncSession — без commit, без FOR UPDATE, без eager-load агрегата (R-SQLA-QRY-5).
# core/order/port/out/order_view_repository.py
from typing import Protocol
class OrderViewRepository(Protocol):
async def summary(self, order_id: int) -> OrderSummaryView | None: ...
async def list_by_customer(
self, customer_id: int, limit: int, offset: int
) -> list[OrderSummaryView]: ...
# adapters/out/persistence/sqlalchemy_order_view_repository.py
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
class SqlAlchemyOrderViewRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def summary(self, order_id: int) -> OrderSummaryView | None:
result = await self._session.execute(
text(
"""
SELECT order_id, customer_id, customer_name, customer_email,
status, item_count, total_amount, currency,
created_at, confirmed_at
FROM order_summary
WHERE order_id = :order_id
"""
),
{"order_id": order_id},
)
row = result.mappings().one_or_none()
return to_order_summary_view(dict(row)) if row else None
async def list_by_customer(
self, customer_id: int, limit: int, offset: int
) -> list[OrderSummaryView]:
result = await self._session.execute(
text(
"""
SELECT order_id, customer_id, customer_name, customer_email,
status, item_count, total_amount, currency,
created_at, confirmed_at
FROM order_summary
WHERE customer_id = :customer_id
ORDER BY created_at DESC
LIMIT :limit OFFSET :offset
"""
),
{"customer_id": customer_id, "limit": limit, "offset": offset},
)
return [to_order_summary_view(dict(r)) for r in result.mappings()]
Read-only сессия создаётся через отдельную фабрику — без autocommit, commit вызывать нельзя:
# infrastructure/db/session.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
read_engine = create_async_engine(READ_DSN, pool_size=10)
ReadSession: async_sessionmaker[AsyncSession] = async_sessionmaker(
read_engine, expire_on_commit=False
)
Query-handler инжектирует OrderViewRepository, не вызывает доменных методов агрегата:
# core/order/usecase/get_order_summary.py
from dataclasses import dataclass
@dataclass(frozen=True)
class GetOrderSummary:
order_id: int
class GetOrderSummaryHandler:
def __init__(self, order_view: OrderViewRepository) -> None:
self._order_view = order_view
async def handle(self, query: GetOrderSummary) -> OrderSummaryView | None:
return await self._order_view.summary(query.order_id)
Обновление через события — eventual consistency
R-CQRS-RM-3: read-model обновляется только через outbox → Kafka → consumer. Никогда — в write-транзакции.
1. command-handler сохраняет Order, записывает OrderConfirmed → outbox-таблица (одна транзакция)
2. outbox-relay (SKIP LOCKED loop) публикует событие в Kafka
3. read-side consumer ловит OrderConfirmed
4. UPDATE order_summary SET status = 'CONFIRMED', confirmed_at = :confirmed_at, version = version + 1
WHERE order_id = :order_id AND version < :event_version
Latency в стационарном режиме — 100ms–1s. Это архитектурно ожидаемо: UI должен понимать eventual consistency.
Почему не synchronous UPDATE в той же транзакции:
- Read-model теряет decoupling.
ALTER TABLE order_summaryблокирует write-транзакции. - При rollback write-агрегата
order_summaryуже могла измениться (если consumer внешний), и это расхождение не откатить. - Cross-DB synchronous sync невозможен без 2PC, который запрещён.
Подробно — в Sync через события.
Read-model восстановима из write-side
R-CQRS-RM-4: для каждой read-model должен существовать скрипт rebuild'а, который проходит по write-side агрегатам и заново строит проекцию.
# core/order/service/order_summary_rebuilder.py
import logging
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
class OrderSummaryRebuilder:
def __init__(
self,
orders: OrderRepository,
order_view: OrderViewRepository,
) -> None:
self._orders = orders
self._order_view = order_view
async def rebuild_all(self) -> None:
last_id = 0
batch_size = 500
while True:
batch = await self._orders.find_all_after(last_id, batch_size)
if not batch:
break
rows = [self._to_summary_row(order) for order in batch]
await self._order_view.upsert_batch(rows)
last_id = batch[-1].id.value
logger.info("rebuild progress: last_id=%d", last_id)
def _to_summary_row(self, order: Order) -> OrderSummaryUpsert:
return OrderSummaryUpsert(
order_id=order.id.value,
customer_id=order.customer_id.value,
customer_name=order.snapshot().customer_name,
customer_email=order.snapshot().customer_email,
status=order.status.value,
item_count=len(order.items),
total_amount=order.total_amount.amount,
currency=order.total_amount.currency,
created_at=order.created_at,
confirmed_at=order.confirmed_at,
updated_at=datetime.now(tz=timezone.utc),
version=0,
)
Rebuild-сервис применяется в трёх сценариях:
- Disaster recovery. Read-model потеряна (отказ Redis cluster, drop в ElasticSearch, миграция).
- Bootstrap нового read-store. Добавляется ElasticSearch — он пустой, надо загнать existing-данные.
- Структурная миграция read-схемы. Новое поле в
order_summary— для старых записей rebuild дозаполнит.
Без rebuild-скрипта read-model становится первичным хранилищем — что нарушает R-CQRS-RM-X2.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| CHECK-constraint бизнес-инварианта в read-таблице | R-CQRS-RM-X1 | Инвариант в агрегате; read-model — только проекция |
| Read-model как единственный источник правды (невосстановимая) | R-CQRS-RM-X2 | Source of truth — write-side; rebuild-скрипт обязателен |
| Bidirectional sync: read-handler пишет в write-side | R-CQRS-RM-X3 | Одно направление: write → events → read |
| Синхронный INSERT в read-таблицу внутри write-транзакции | R-CQRS-SYNC-X1 | Outbox + Kafka + consumer |
Грузить агрегат через основной <X>Repository и маппить в read-DTO | R-CQRS-QRY-X2 | Отдельный <X>ViewRepository с raw select |
| Event payload = SQLAlchemy ORM-модель write-схемы (schema-coupled) | R-CQRS-SYNC-X3 | Версионированный event-contract, независимый от схемы БД |
Маркер Query[R] без enforcement (handler пишет, делает commit) | R-CQRS-TIER-X1 | Query-handler: только read, read-only сессия, без commit |
Куда дальше
- CQRS → раздел 4. Read-model — нормативные формулировки
R-CQRS-RM-*. - Sync через события — как outbox + Kafka доставляет события до read-model.
- Query side — как query-handler читает из read-model через
<X>ViewRepository. - Command side — что command-handler возвращает и почему не read-DTO.
- Уровень и эволюция — когда переходить от lightweight к event-driven read-model.
- Когда CQRS оправдан — критерии для полного CQRS vs lightweight маркеров.