Опирается на правила: R-CQRS-RM-1R-CQRS-RM-4 и R-CQRS-RM-X1R-CQRS-RM-X3 из CQRS Style Guide → раздел 4. Read-model.

Важно знать

  • Read-model — данные в форме, удобной для чтения: денормализованные, pre-aggregated, без join'ов на горячем пути.
  • Хранилище выбирается под паттерн чтения: PG-таблица, Redis, ElasticSearch — не одно «универсальное».
  • Schema read-model независима от write-схемы: один атрибут может присутствовать в нескольких read-DTO под разными именами.
  • Читается через <X>ViewRepository — raw text() select через read-only AsyncSession без commit (R-SQLA-QRY-5).
  • Обновляется через события (outbox → Kafka → consumer), не синхронно в write-транзакции (R-CQRS-RM-3).
  • Read-model восстановима из write-side: при потере — rebuilder проходит по агрегатам и заново строит проекцию (R-CQRS-RM-4).
  • Source of truth — write-side агрегаты. Read-model — производная.
  • Никакой бизнес-логики в read-model (CHECK-инварианты, триггеры). Никакого bidirectional sync (R-CQRS-RM-X1, R-CQRS-RM-X3).

Read-model — денормализованное представление, в котором данные уже сложены так, как их хочет конечный потребитель: один SELECT без join'ов возвращает готовый объект для UI / API. Цена — eventual consistency и дополнительная инфраструктура; выгода — порядки разницы в latency и пропускной способности. Статья раскрывает раздел 4 CQRS Style Guide в идиомах FastAPI / SQLAlchemy.

Где хранить read-model

R-CQRS-RM-1: выбор хранилища — функция паттерна чтения, не предпочтений команды.

Паттерн чтенияХранилищеПочему
Tabular query с пагинацией, фильтром, сортировкойДенормализованная PG-таблицаРеляционка хорошо умеет такой workload, sync через outbox
Тяжёлые aggregations (GROUP BY миллионов строк)PG materialized viewPre-computed, refresh по расписанию или событию
Key-lookup hot-keys (по ID, по короткому ключу)RedisSub-millisecond latency, горизонтальное масштабирование
Full-text search, multi-field фильтры с relevanceElasticSearch / OpenSearchInverted index, ranking, faceted search

PG-таблица — дефолт

Денормализованная таблица в той же СУБД — почти всегда первый шаг. Никакой новой инфраструктуры, синхронизация через outbox.

CREATE TABLE order_summary (
    order_id        BIGINT PRIMARY KEY,
    customer_id     BIGINT NOT NULL,
    customer_name   TEXT NOT NULL,
    customer_email  TEXT NOT NULL,
    status          TEXT NOT NULL,
    item_count      INTEGER NOT NULL,
    total_amount    NUMERIC(19,4) NOT NULL,
    currency        TEXT NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL,
    confirmed_at    TIMESTAMPTZ,
    shipped_at      TIMESTAMPTZ,
    updated_at      TIMESTAMPTZ NOT NULL,
    version         BIGINT NOT NULL DEFAULT 0
);
CREATE INDEX ix_os_customer    ON order_summary (customer_id, created_at DESC);
CREATE INDEX ix_os_status_date ON order_summary (status, created_at DESC);

Поле version — для idempotent UPDATE в consumer'е (см. Sync через события).

PG materialized view — для тяжёлых aggregations

Сводка вроде «оборот по продуктам за месяц» пересчитывается редко, хранится заранее:

CREATE MATERIALIZED VIEW product_revenue_daily AS
SELECT
    p.product_id,
    p.name,
    DATE(oi.created_at)              AS day,
    SUM(oi.quantity * oi.unit_price) AS revenue,
    COUNT(DISTINCT o.id)             AS order_count
FROM order_item oi
JOIN product p ON p.product_id = oi.product_id
JOIN "order" o ON o.id = oi.order_id
WHERE o.status IN ('CONFIRMED', 'SHIPPED', 'DELIVERED')
GROUP BY p.product_id, p.name, DATE(oi.created_at);

CREATE UNIQUE INDEX ux_prd_pk ON product_revenue_daily (product_id, day);

Refresh — REFRESH MATERIALIZED VIEW CONCURRENTLY по расписанию (APScheduler или Celery Beat) или по событию OrderConfirmed.

Redis — для hot-key lookup

Проекция Customer → ActiveSubscriptionPlan — читается на каждый запрос пользователя:

# adapters/out/persistence/redis_subscription_view_repository.py
from dataclasses import dataclass
from redis.asyncio import Redis

@dataclass(frozen=True)
class SubscriptionPlan:
    plan: str
    expires_at: str

class RedisSubscriptionViewRepository:
    def __init__(self, redis: Redis) -> None:
        self._redis = redis

    async def find_by_customer(self, customer_id: int) -> SubscriptionPlan | None:
        raw = await self._redis.get(f"customer:{customer_id}:plan")
        if raw is None:
            return None
        data = json.loads(raw)
        return SubscriptionPlan(plan=data["plan"], expires_at=data["expires_at"])

    async def upsert(self, customer_id: int, plan: SubscriptionPlan) -> None:
        await self._redis.set(
            f"customer:{customer_id}:plan",
            json.dumps({"plan": plan.plan, "expires_at": plan.expires_at}),
            ex=3600,
        )

Consumer на SubscriptionUpdated вызывает upsert. Разница с обычным кешем: read-model в Redis — источник ответа, не fallback.

Поиск по описаниям Product, фильтры по 20+ атрибутам с relevance:

# adapters/out/search/es_product_view_repository.py
from dataclasses import dataclass
from elasticsearch import AsyncElasticsearch

@dataclass(frozen=True)
class ProductSearchResult:
    product_id: int
    name: str
    price: int
    in_stock: bool
    rating: float

class ElasticsearchProductViewRepository:
    def __init__(self, es: AsyncElasticsearch) -> None:
        self._es = es

    async def search(
        self,
        q: str,
        min_rating: float | None = None,
        in_stock: bool | None = None,
    ) -> list[ProductSearchResult]:
        must = [{"match": {"name": q}}]
        filters = []
        if min_rating is not None:
            filters.append({"range": {"rating": {"gte": min_rating}}})
        if in_stock is not None:
            filters.append({"term": {"in_stock": in_stock}})

        resp = await self._es.search(
            index="products",
            query={"bool": {"must": must, "filter": filters}},
        )
        return [_to_result(hit["_source"]) for hit in resp["hits"]["hits"]]

Consumer'ы на ProductCreated, ProductPriceChanged, StockUpdated обновляют документ в индексе.

Schema независимая от write-стороны

R-CQRS-RM-2: read-схема и read-DTO продиктованы потребителем, не агрегатом.

write-схема:                          read-схема (order_summary):
  order(id, customer_id, status)        order_summary(
  order_item(order_id, qty, price)         order_id,
  customer(id, name, email)               customer_name,   ← денормализовано из customer
                                          customer_email,  ← денормализовано из customer
                                          status,
                                          item_count       ← pre-computed из order_item
                                       )

В Python read-DTO — @dataclass(frozen=True) или Pydantic-модель (model_config = ConfigDict(frozen=True)):

# core/order/port/out/view/order_summary_view.py
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal

@dataclass(frozen=True)
class OrderSummaryView:
    order_id: int
    customer_id: int
    customer_name: str
    customer_email: str
    status: str
    item_count: int
    total_amount: Decimal
    currency: str
    created_at: datetime
    confirmed_at: datetime | None

def to_order_summary_view(row: dict) -> OrderSummaryView:
    return OrderSummaryView(
        order_id=int(row["order_id"]),
        customer_id=int(row["customer_id"]),
        customer_name=str(row["customer_name"]),
        customer_email=str(row["customer_email"]),
        status=str(row["status"]),
        item_count=int(row["item_count"]),
        total_amount=Decimal(str(row["total_amount"])),
        currency=str(row["currency"]),
        created_at=row["created_at"],
        confirmed_at=row.get("confirmed_at"),
    )

ViewRepository — raw select через read-only сессию

R-CQRS-QRY-2, R-CQRS-TIER-3: read-side использует отдельный <X>ViewRepository, который читает через raw text() select в read-only AsyncSession — без commit, без FOR UPDATE, без eager-load агрегата (R-SQLA-QRY-5).

# core/order/port/out/order_view_repository.py
from typing import Protocol

class OrderViewRepository(Protocol):
    async def summary(self, order_id: int) -> OrderSummaryView | None: ...
    async def list_by_customer(
        self, customer_id: int, limit: int, offset: int
    ) -> list[OrderSummaryView]: ...
# adapters/out/persistence/sqlalchemy_order_view_repository.py
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

class SqlAlchemyOrderViewRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def summary(self, order_id: int) -> OrderSummaryView | None:
        result = await self._session.execute(
            text(
                """
                SELECT order_id, customer_id, customer_name, customer_email,
                       status, item_count, total_amount, currency,
                       created_at, confirmed_at
                  FROM order_summary
                 WHERE order_id = :order_id
                """
            ),
            {"order_id": order_id},
        )
        row = result.mappings().one_or_none()
        return to_order_summary_view(dict(row)) if row else None

    async def list_by_customer(
        self, customer_id: int, limit: int, offset: int
    ) -> list[OrderSummaryView]:
        result = await self._session.execute(
            text(
                """
                SELECT order_id, customer_id, customer_name, customer_email,
                       status, item_count, total_amount, currency,
                       created_at, confirmed_at
                  FROM order_summary
                 WHERE customer_id = :customer_id
                 ORDER BY created_at DESC
                 LIMIT :limit OFFSET :offset
                """
            ),
            {"customer_id": customer_id, "limit": limit, "offset": offset},
        )
        return [to_order_summary_view(dict(r)) for r in result.mappings()]

Read-only сессия создаётся через отдельную фабрику — без autocommit, commit вызывать нельзя:

# infrastructure/db/session.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

read_engine = create_async_engine(READ_DSN, pool_size=10)
ReadSession: async_sessionmaker[AsyncSession] = async_sessionmaker(
    read_engine, expire_on_commit=False
)

Query-handler инжектирует OrderViewRepository, не вызывает доменных методов агрегата:

# core/order/usecase/get_order_summary.py
from dataclasses import dataclass

@dataclass(frozen=True)
class GetOrderSummary:
    order_id: int

class GetOrderSummaryHandler:
    def __init__(self, order_view: OrderViewRepository) -> None:
        self._order_view = order_view

    async def handle(self, query: GetOrderSummary) -> OrderSummaryView | None:
        return await self._order_view.summary(query.order_id)

Обновление через события — eventual consistency

R-CQRS-RM-3: read-model обновляется только через outbox → Kafka → consumer. Никогда — в write-транзакции.

1. command-handler сохраняет Order, записывает OrderConfirmed → outbox-таблица (одна транзакция)
2. outbox-relay (SKIP LOCKED loop) публикует событие в Kafka
3. read-side consumer ловит OrderConfirmed
4. UPDATE order_summary SET status = 'CONFIRMED', confirmed_at = :confirmed_at, version = version + 1
   WHERE order_id = :order_id AND version < :event_version

Latency в стационарном режиме — 100ms–1s. Это архитектурно ожидаемо: UI должен понимать eventual consistency.

Почему не synchronous UPDATE в той же транзакции:

  • Read-model теряет decoupling. ALTER TABLE order_summary блокирует write-транзакции.
  • При rollback write-агрегата order_summary уже могла измениться (если consumer внешний), и это расхождение не откатить.
  • Cross-DB synchronous sync невозможен без 2PC, который запрещён.

Подробно — в Sync через события.

Read-model восстановима из write-side

R-CQRS-RM-4: для каждой read-model должен существовать скрипт rebuild'а, который проходит по write-side агрегатам и заново строит проекцию.

# core/order/service/order_summary_rebuilder.py
import logging
from datetime import datetime, timezone

logger = logging.getLogger(__name__)

class OrderSummaryRebuilder:
    def __init__(
        self,
        orders: OrderRepository,
        order_view: OrderViewRepository,
    ) -> None:
        self._orders = orders
        self._order_view = order_view

    async def rebuild_all(self) -> None:
        last_id = 0
        batch_size = 500

        while True:
            batch = await self._orders.find_all_after(last_id, batch_size)
            if not batch:
                break

            rows = [self._to_summary_row(order) for order in batch]
            await self._order_view.upsert_batch(rows)

            last_id = batch[-1].id.value
            logger.info("rebuild progress: last_id=%d", last_id)

    def _to_summary_row(self, order: Order) -> OrderSummaryUpsert:
        return OrderSummaryUpsert(
            order_id=order.id.value,
            customer_id=order.customer_id.value,
            customer_name=order.snapshot().customer_name,
            customer_email=order.snapshot().customer_email,
            status=order.status.value,
            item_count=len(order.items),
            total_amount=order.total_amount.amount,
            currency=order.total_amount.currency,
            created_at=order.created_at,
            confirmed_at=order.confirmed_at,
            updated_at=datetime.now(tz=timezone.utc),
            version=0,
        )

Rebuild-сервис применяется в трёх сценариях:

  1. Disaster recovery. Read-model потеряна (отказ Redis cluster, drop в ElasticSearch, миграция).
  2. Bootstrap нового read-store. Добавляется ElasticSearch — он пустой, надо загнать existing-данные.
  3. Структурная миграция read-схемы. Новое поле в order_summary — для старых записей rebuild дозаполнит.

Без rebuild-скрипта read-model становится первичным хранилищем — что нарушает R-CQRS-RM-X2.

Что запрещено

АнтипаттернПравилоЧто взамен
CHECK-constraint бизнес-инварианта в read-таблицеR-CQRS-RM-X1Инвариант в агрегате; read-model — только проекция
Read-model как единственный источник правды (невосстановимая)R-CQRS-RM-X2Source of truth — write-side; rebuild-скрипт обязателен
Bidirectional sync: read-handler пишет в write-sideR-CQRS-RM-X3Одно направление: write → events → read
Синхронный INSERT в read-таблицу внутри write-транзакцииR-CQRS-SYNC-X1Outbox + Kafka + consumer
Грузить агрегат через основной <X>Repository и маппить в read-DTOR-CQRS-QRY-X2Отдельный <X>ViewRepository с raw select
Event payload = SQLAlchemy ORM-модель write-схемы (schema-coupled)R-CQRS-SYNC-X3Версионированный event-contract, независимый от схемы БД
Маркер Query[R] без enforcement (handler пишет, делает commit)R-CQRS-TIER-X1Query-handler: только read, read-only сессия, без commit

Куда дальше

  • CQRS → раздел 4. Read-model — нормативные формулировки R-CQRS-RM-*.
  • Sync через события — как outbox + Kafka доставляет события до read-model.
  • Query side — как query-handler читает из read-model через <X>ViewRepository.
  • Command side — что command-handler возвращает и почему не read-DTO.
  • Уровень и эволюция — когда переходить от lightweight к event-driven read-model.
  • Когда CQRS оправдан — критерии для полного CQRS vs lightweight маркеров.