← назад к разделу

Классический способ читать данные — загрузить объект из базы и сделать несколько JOIN, чтобы собрать нужный ответ. Когда запросов тысячи в секунду, а данные разбросаны по десяткам таблиц, такой подход начинает тормозить. Read-model — это другая идея: данные заранее складываются именно в той форме, в которой их хочет получить потребитель. Один SELECT — готовый ответ без JOIN.

Что такое read-model

Представьте магазин с заказами. Для отображения списка заказов покупателя нужны данные из трёх таблиц: заказ, позиции заказа и профиль покупателя. При обычном подходе каждый запрос делает несколько объединений таблиц.

Read-model — это отдельная таблица (или другое хранилище), в которой всё уже собрано. Каждый раз, когда что-то меняется на стороне записи, read-model обновляется. Когда пользователь запрашивает данные — читаем напрямую из этой готовой проекции.

Главный принцип: схема read-model продиктована потребителем, а не агрегатом. Ей не обязательно повторять структуру write-стороны.

write-схема:                          read-схема (order_summary):
  order(id, customer_id, status)        order_summary(
  order_item(order_id, qty, price)         order_id,
  customer(id, name, email)               customer_name,   ← из customer
                                          customer_email,  ← из customer
                                          status,
                                          item_count       ← посчитано из order_item
                                       )

Source of truth — всегда write-сторона. Read-model — это производное, которое можно восстановить заново.

Где хранить read-model

Хранилище выбирается под характер запросов, а не по личным предпочтениям.

Что нужноХранилище
Таблица с пагинацией, фильтрами, сортировкойДенормализованная PG-таблица
Тяжёлые агрегации по миллионам строкPG materialized view
Горячие lookup по ID или короткому ключуRedis
Полнотекстовый поиск, многополевые фильтрыElasticSearch / OpenSearch

PG-таблица — первый выбор

Денормализованная таблица в той же базе данных — почти всегда первый шаг. Никакой новой инфраструктуры, синхронизация через outbox.

CREATE TABLE order_summary (
    order_id        BIGINT PRIMARY KEY,
    customer_id     BIGINT NOT NULL,
    customer_name   TEXT NOT NULL,
    customer_email  TEXT NOT NULL,
    status          TEXT NOT NULL,
    item_count      INTEGER NOT NULL,
    total_amount    NUMERIC(19,4) NOT NULL,
    currency        TEXT NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL,
    confirmed_at    TIMESTAMPTZ,
    updated_at      TIMESTAMPTZ NOT NULL,
    version         BIGINT NOT NULL DEFAULT 0
);
CREATE INDEX ix_os_customer    ON order_summary (customer_id, created_at DESC);
CREATE INDEX ix_os_status_date ON order_summary (status, created_at DESC);

Поле version помогает потребителю событий не применять устаревшее обновление дважды.

PG materialized view — для предподсчитанных агрегаций

Сводка вроде «оборот по продуктам за месяц» пересчитывается редко, зато читается часто. Materialized view считает её заранее и хранит как обычную таблицу:

CREATE MATERIALIZED VIEW product_revenue_daily AS
SELECT
    p.product_id,
    p.name,
    DATE(oi.created_at)              AS day,
    SUM(oi.quantity * oi.unit_price) AS revenue,
    COUNT(DISTINCT o.id)             AS order_count
FROM order_item oi
JOIN product p ON p.product_id = oi.product_id
JOIN "order" o ON o.id = oi.order_id
WHERE o.status IN ('CONFIRMED', 'SHIPPED', 'DELIVERED')
GROUP BY p.product_id, p.name, DATE(oi.created_at);

CREATE UNIQUE INDEX ux_prd_pk ON product_revenue_daily (product_id, day);

Обновление — REFRESH MATERIALIZED VIEW CONCURRENTLY по расписанию или по событию OrderConfirmed.

Redis — для горячих lookup

Если одни и те же данные читаются на каждый запрос пользователя (например, тарифный план подписки), они хорошо ложатся в Redis:

# adapters/out/persistence/redis_subscription_view_repository.py
import json
from dataclasses import dataclass
from redis.asyncio import Redis

@dataclass(frozen=True)
class SubscriptionPlan:
    plan: str
    expires_at: str

class RedisSubscriptionViewRepository:
    def __init__(self, redis: Redis) -> None:
        self._redis = redis

    async def find_by_customer(self, customer_id: int) -> SubscriptionPlan | None:
        raw = await self._redis.get(f"customer:{customer_id}:plan")
        if raw is None:
            return None
        data = json.loads(raw)
        return SubscriptionPlan(plan=data["plan"], expires_at=data["expires_at"])

    async def upsert(self, customer_id: int, plan: SubscriptionPlan) -> None:
        await self._redis.set(
            f"customer:{customer_id}:plan",
            json.dumps({"plan": plan.plan, "expires_at": plan.expires_at}),
            ex=3600,
        )

Отличие от обычного кеша: Redis здесь — основное хранилище ответа, не резервный вариант при промахе кеша.

ElasticSearch — для полнотекстового поиска

Поиск по описаниям с фильтрами по десяткам атрибутов и ранжированием — задача для инвертированного индекса:

# adapters/out/search/es_product_view_repository.py
from dataclasses import dataclass
from elasticsearch import AsyncElasticsearch

@dataclass(frozen=True)
class ProductSearchResult:
    product_id: int
    name: str
    price: int
    in_stock: bool
    rating: float

class ElasticsearchProductViewRepository:
    def __init__(self, es: AsyncElasticsearch) -> None:
        self._es = es

    async def search(
        self,
        q: str,
        min_rating: float | None = None,
        in_stock: bool | None = None,
    ) -> list[ProductSearchResult]:
        must = [{"match": {"name": q}}]
        filters = []
        if min_rating is not None:
            filters.append({"range": {"rating": {"gte": min_rating}}})
        if in_stock is not None:
            filters.append({"term": {"in_stock": in_stock}})

        resp = await self._es.search(
            index="products",
            query={"bool": {"must": must, "filter": filters}},
        )
        return [_to_result(hit["_source"]) for hit in resp["hits"]["hits"]]

Как читать из read-model: ViewRepository

Для чтения из read-model создаётся отдельный ViewRepository — он работает через чистый SQL в read-only сессии. Никаких commit, никакой блокировки FOR UPDATE, никакой загрузки агрегата через ORM.

Сначала — интерфейс порта:

# core/order/port/out/order_view_repository.py
from typing import Protocol

class OrderViewRepository(Protocol):
    async def summary(self, order_id: int) -> OrderSummaryView | None: ...
    async def list_by_customer(
        self, customer_id: int, limit: int, offset: int
    ) -> list[OrderSummaryView]: ...
    async def upsert_batch(self, rows: list[OrderSummaryUpsert]) -> None: ...

Реализация через SQLAlchemy с text():

# adapters/out/persistence/sqlalchemy_order_view_repository.py
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

class SqlAlchemyOrderViewRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def summary(self, order_id: int) -> OrderSummaryView | None:
        result = await self._session.execute(
            text("""
                SELECT order_id, customer_id, customer_name, customer_email,
                       status, item_count, total_amount, currency,
                       created_at, confirmed_at
                  FROM order_summary
                 WHERE order_id = :order_id
            """),
            {"order_id": order_id},
        )
        row = result.mappings().one_or_none()
        return to_order_summary_view(dict(row)) if row else None

    async def list_by_customer(
        self, customer_id: int, limit: int, offset: int
    ) -> list[OrderSummaryView]:
        result = await self._session.execute(
            text("""
                SELECT order_id, customer_id, customer_name, customer_email,
                       status, item_count, total_amount, currency,
                       created_at, confirmed_at
                  FROM order_summary
                 WHERE customer_id = :customer_id
                 ORDER BY created_at DESC
                 LIMIT :limit OFFSET :offset
            """),
            {"customer_id": customer_id, "limit": limit, "offset": offset},
        )
        return [to_order_summary_view(dict(r)) for r in result.mappings()]

Read-only сессия создаётся через отдельный движок — без autocommit, commit вызывать нельзя:

# infrastructure/db/session.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

read_engine = create_async_engine(READ_DSN, pool_size=10)
ReadSession: async_sessionmaker[AsyncSession] = async_sessionmaker(
    read_engine, expire_on_commit=False
)

Query-handler просто вызывает репозиторий — никакой доменной логики:

# core/order/usecase/get_order_summary.py
from dataclasses import dataclass

@dataclass(frozen=True)
class GetOrderSummary:
    order_id: int

class GetOrderSummaryHandler:
    def __init__(self, order_view: OrderViewRepository) -> None:
        self._order_view = order_view

    async def handle(self, query: GetOrderSummary) -> OrderSummaryView | None:
        return await self._order_view.summary(query.order_id)

Как обновляется read-model: через события

Read-model нельзя обновлять синхронно внутри той же транзакции, которая сохраняет агрегат. Это нарушает независимость сторон: изменение схемы read-model начнёт влиять на write-транзакции, а откат агрегата может не откатить уже изменённую проекцию.

Правильный путь — через события:

1. command-handler сохраняет Order,
   записывает OrderConfirmed → outbox-таблица (одна транзакция)
2. outbox-relay публикует событие в Kafka
3. read-side consumer ловит OrderConfirmed
4. UPDATE order_summary
   SET status = 'CONFIRMED', confirmed_at = :confirmed_at, version = version + 1
   WHERE order_id = :order_id AND version < :event_version

Задержка в нормальном режиме — 100 мс–1 с. Это ожидаемое поведение: UI должен знать, что данные обновляются с небольшой задержкой. Называется это eventual consistency (согласованность в конечном счёте).

Как восстановить read-model с нуля

Read-model — производное. Если она потерялась (отказ Redis, удаление таблицы, миграция), её можно построить заново, пройдя по write-side агрегатам.

Для этого делают отдельный сервис-перестройщик:

# core/order/service/order_summary_rebuilder.py
import logging
from datetime import datetime, timezone

logger = logging.getLogger(__name__)

class OrderSummaryRebuilder:
    def __init__(
        self,
        orders: OrderRepository,
        customers: CustomerRepository,
        order_view: OrderViewRepository,
    ) -> None:
        self._orders = orders
        self._customers = customers
        self._order_view = order_view

    async def rebuild_all(self) -> None:
        last_id = 0
        batch_size = 500

        while True:
            batch = await self._orders.find_all_after(last_id, batch_size)
            if not batch:
                break

            customer_ids = {order.customer_id.value for order in batch}
            customers = await self._customers.find_by_ids(customer_ids)
            customer_map = {c.id.value: c for c in customers}

            rows = [
                self._to_summary_row(order, customer_map[order.customer_id.value])
                for order in batch
            ]
            await self._order_view.upsert_batch(rows)

            last_id = batch[-1].id.value
            logger.info("rebuild progress: last_id=%d", last_id)

Когда это нужно:

  • Восстановление после сбоя — Redis-кластер упал, ElasticSearch-индекс был удалён.
  • Миграция в новое хранилище — добавляете ElasticSearch, который пока пустой, и нужно залить в него исторические данные.
  • Структурное изменение read-схемы — добавили новое поле в order_summary и хотите заполнить его для старых записей.

Если перестройщика нет, read-model превращается в первичный источник данных — что недопустимо.

Частые ошибки

Бизнес-инварианты в read-таблице. Если в order_summary добавить CHECK-ограничение вроде «сумма не может быть отрицательной» — это означает, что read-model начинает диктовать правила бизнеса. Инварианты живут только в агрегате на write-стороне.

Read-model как единственный источник правды. Если write-side не хранит достаточно данных для перестройки, read-model становится незаменимой — и любая её потеря катастрофична. Source of truth — всегда write-side.

Синхронная запись в read-model из write-транзакции. Удобно в краткосрочной перспективе, но связывает две стороны накрепко. Стоит одной стороне поменять схему — другая ломается.

Загрузка агрегата через основной репозиторий для query-запросов. ORM загружает граф объектов, делает лишние запросы, применяет ленивую загрузку. Для чтения — отдельный ViewRepository с простым SELECT.

Коротко

  • Read-model — это проекция данных в форме, удобной для чтения: один SELECT без JOIN возвращает готовый ответ.
  • Схема read-model продиктована потребителем, а не write-стороной; они могут сильно отличаться.
  • Хранилище выбирается под характер запросов: PG-таблица для табличных запросов, materialized view для агрегаций, Redis для горячих lookup, ElasticSearch для полнотекстового поиска.
  • Чтение — через ViewRepository с read-only AsyncSession и чистым SQL, без commit.
  • Обновление — только через события (outbox → Kafka → consumer), не синхронно внутри write-транзакции.
  • Read-model всегда восстановима из write-стороны — для этого нужен перестройщик.
  • Source of truth — write-side агрегаты; read-model производная.

Что почитать дальше