Классический способ читать данные — загрузить объект из базы и сделать несколько JOIN, чтобы собрать нужный ответ. Когда запросов тысячи в секунду, а данные разбросаны по десяткам таблиц, такой подход начинает тормозить. Read-model — это другая идея: данные заранее складываются именно в той форме, в которой их хочет получить потребитель. Один SELECT — готовый ответ без JOIN.
Что такое read-model
Представьте магазин с заказами. Для отображения списка заказов покупателя нужны данные из трёх таблиц: заказ, позиции заказа и профиль покупателя. При обычном подходе каждый запрос делает несколько объединений таблиц.
Read-model — это отдельная таблица (или другое хранилище), в которой всё уже собрано. Каждый раз, когда что-то меняется на стороне записи, read-model обновляется. Когда пользователь запрашивает данные — читаем напрямую из этой готовой проекции.
Главный принцип: схема read-model продиктована потребителем, а не агрегатом. Ей не обязательно повторять структуру write-стороны.
write-схема: read-схема (order_summary):
order(id, customer_id, status) order_summary(
order_item(order_id, qty, price) order_id,
customer(id, name, email) customer_name, ← из customer
customer_email, ← из customer
status,
item_count ← посчитано из order_item
)
Source of truth — всегда write-сторона. Read-model — это производное, которое можно восстановить заново.
Где хранить read-model
Хранилище выбирается под характер запросов, а не по личным предпочтениям.
| Что нужно | Хранилище |
|---|---|
| Таблица с пагинацией, фильтрами, сортировкой | Денормализованная PG-таблица |
| Тяжёлые агрегации по миллионам строк | PG materialized view |
| Горячие lookup по ID или короткому ключу | Redis |
| Полнотекстовый поиск, многополевые фильтры | ElasticSearch / OpenSearch |
PG-таблица — первый выбор
Денормализованная таблица в той же базе данных — почти всегда первый шаг. Никакой новой инфраструктуры, синхронизация через outbox.
CREATE TABLE order_summary (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
customer_name TEXT NOT NULL,
customer_email TEXT NOT NULL,
status TEXT NOT NULL,
item_count INTEGER NOT NULL,
total_amount NUMERIC(19,4) NOT NULL,
currency TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
confirmed_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL,
version BIGINT NOT NULL DEFAULT 0
);
CREATE INDEX ix_os_customer ON order_summary (customer_id, created_at DESC);
CREATE INDEX ix_os_status_date ON order_summary (status, created_at DESC);
Поле version помогает потребителю событий не применять устаревшее обновление дважды.
PG materialized view — для предподсчитанных агрегаций
Сводка вроде «оборот по продуктам за месяц» пересчитывается редко, зато читается часто. Materialized view считает её заранее и хранит как обычную таблицу:
CREATE MATERIALIZED VIEW product_revenue_daily AS
SELECT
p.product_id,
p.name,
DATE(oi.created_at) AS day,
SUM(oi.quantity * oi.unit_price) AS revenue,
COUNT(DISTINCT o.id) AS order_count
FROM order_item oi
JOIN product p ON p.product_id = oi.product_id
JOIN "order" o ON o.id = oi.order_id
WHERE o.status IN ('CONFIRMED', 'SHIPPED', 'DELIVERED')
GROUP BY p.product_id, p.name, DATE(oi.created_at);
CREATE UNIQUE INDEX ux_prd_pk ON product_revenue_daily (product_id, day);
Обновление — REFRESH MATERIALIZED VIEW CONCURRENTLY по расписанию или по событию OrderConfirmed.
Redis — для горячих lookup
Если одни и те же данные читаются на каждый запрос пользователя (например, тарифный план подписки), они хорошо ложатся в Redis:
# adapters/out/persistence/redis_subscription_view_repository.py
import json
from dataclasses import dataclass
from redis.asyncio import Redis
@dataclass(frozen=True)
class SubscriptionPlan:
plan: str
expires_at: str
class RedisSubscriptionViewRepository:
def __init__(self, redis: Redis) -> None:
self._redis = redis
async def find_by_customer(self, customer_id: int) -> SubscriptionPlan | None:
raw = await self._redis.get(f"customer:{customer_id}:plan")
if raw is None:
return None
data = json.loads(raw)
return SubscriptionPlan(plan=data["plan"], expires_at=data["expires_at"])
async def upsert(self, customer_id: int, plan: SubscriptionPlan) -> None:
await self._redis.set(
f"customer:{customer_id}:plan",
json.dumps({"plan": plan.plan, "expires_at": plan.expires_at}),
ex=3600,
)
Отличие от обычного кеша: Redis здесь — основное хранилище ответа, не резервный вариант при промахе кеша.
ElasticSearch — для полнотекстового поиска
Поиск по описаниям с фильтрами по десяткам атрибутов и ранжированием — задача для инвертированного индекса:
# adapters/out/search/es_product_view_repository.py
from dataclasses import dataclass
from elasticsearch import AsyncElasticsearch
@dataclass(frozen=True)
class ProductSearchResult:
product_id: int
name: str
price: int
in_stock: bool
rating: float
class ElasticsearchProductViewRepository:
def __init__(self, es: AsyncElasticsearch) -> None:
self._es = es
async def search(
self,
q: str,
min_rating: float | None = None,
in_stock: bool | None = None,
) -> list[ProductSearchResult]:
must = [{"match": {"name": q}}]
filters = []
if min_rating is not None:
filters.append({"range": {"rating": {"gte": min_rating}}})
if in_stock is not None:
filters.append({"term": {"in_stock": in_stock}})
resp = await self._es.search(
index="products",
query={"bool": {"must": must, "filter": filters}},
)
return [_to_result(hit["_source"]) for hit in resp["hits"]["hits"]]
Как читать из read-model: ViewRepository
Для чтения из read-model создаётся отдельный ViewRepository — он работает через чистый SQL в read-only сессии. Никаких commit, никакой блокировки FOR UPDATE, никакой загрузки агрегата через ORM.
Сначала — интерфейс порта:
# core/order/port/out/order_view_repository.py
from typing import Protocol
class OrderViewRepository(Protocol):
async def summary(self, order_id: int) -> OrderSummaryView | None: ...
async def list_by_customer(
self, customer_id: int, limit: int, offset: int
) -> list[OrderSummaryView]: ...
async def upsert_batch(self, rows: list[OrderSummaryUpsert]) -> None: ...
Реализация через SQLAlchemy с text():
# adapters/out/persistence/sqlalchemy_order_view_repository.py
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
class SqlAlchemyOrderViewRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def summary(self, order_id: int) -> OrderSummaryView | None:
result = await self._session.execute(
text("""
SELECT order_id, customer_id, customer_name, customer_email,
status, item_count, total_amount, currency,
created_at, confirmed_at
FROM order_summary
WHERE order_id = :order_id
"""),
{"order_id": order_id},
)
row = result.mappings().one_or_none()
return to_order_summary_view(dict(row)) if row else None
async def list_by_customer(
self, customer_id: int, limit: int, offset: int
) -> list[OrderSummaryView]:
result = await self._session.execute(
text("""
SELECT order_id, customer_id, customer_name, customer_email,
status, item_count, total_amount, currency,
created_at, confirmed_at
FROM order_summary
WHERE customer_id = :customer_id
ORDER BY created_at DESC
LIMIT :limit OFFSET :offset
"""),
{"customer_id": customer_id, "limit": limit, "offset": offset},
)
return [to_order_summary_view(dict(r)) for r in result.mappings()]
Read-only сессия создаётся через отдельный движок — без autocommit, commit вызывать нельзя:
# infrastructure/db/session.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
read_engine = create_async_engine(READ_DSN, pool_size=10)
ReadSession: async_sessionmaker[AsyncSession] = async_sessionmaker(
read_engine, expire_on_commit=False
)
Query-handler просто вызывает репозиторий — никакой доменной логики:
# core/order/usecase/get_order_summary.py
from dataclasses import dataclass
@dataclass(frozen=True)
class GetOrderSummary:
order_id: int
class GetOrderSummaryHandler:
def __init__(self, order_view: OrderViewRepository) -> None:
self._order_view = order_view
async def handle(self, query: GetOrderSummary) -> OrderSummaryView | None:
return await self._order_view.summary(query.order_id)
Как обновляется read-model: через события
Read-model нельзя обновлять синхронно внутри той же транзакции, которая сохраняет агрегат. Это нарушает независимость сторон: изменение схемы read-model начнёт влиять на write-транзакции, а откат агрегата может не откатить уже изменённую проекцию.
Правильный путь — через события:
1. command-handler сохраняет Order,
записывает OrderConfirmed → outbox-таблица (одна транзакция)
2. outbox-relay публикует событие в Kafka
3. read-side consumer ловит OrderConfirmed
4. UPDATE order_summary
SET status = 'CONFIRMED', confirmed_at = :confirmed_at, version = version + 1
WHERE order_id = :order_id AND version < :event_version
Задержка в нормальном режиме — 100 мс–1 с. Это ожидаемое поведение: UI должен знать, что данные обновляются с небольшой задержкой. Называется это eventual consistency (согласованность в конечном счёте).
Как восстановить read-model с нуля
Read-model — производное. Если она потерялась (отказ Redis, удаление таблицы, миграция), её можно построить заново, пройдя по write-side агрегатам.
Для этого делают отдельный сервис-перестройщик:
# core/order/service/order_summary_rebuilder.py
import logging
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
class OrderSummaryRebuilder:
def __init__(
self,
orders: OrderRepository,
customers: CustomerRepository,
order_view: OrderViewRepository,
) -> None:
self._orders = orders
self._customers = customers
self._order_view = order_view
async def rebuild_all(self) -> None:
last_id = 0
batch_size = 500
while True:
batch = await self._orders.find_all_after(last_id, batch_size)
if not batch:
break
customer_ids = {order.customer_id.value for order in batch}
customers = await self._customers.find_by_ids(customer_ids)
customer_map = {c.id.value: c for c in customers}
rows = [
self._to_summary_row(order, customer_map[order.customer_id.value])
for order in batch
]
await self._order_view.upsert_batch(rows)
last_id = batch[-1].id.value
logger.info("rebuild progress: last_id=%d", last_id)
Когда это нужно:
- Восстановление после сбоя — Redis-кластер упал, ElasticSearch-индекс был удалён.
- Миграция в новое хранилище — добавляете ElasticSearch, который пока пустой, и нужно залить в него исторические данные.
- Структурное изменение read-схемы — добавили новое поле в
order_summaryи хотите заполнить его для старых записей.
Если перестройщика нет, read-model превращается в первичный источник данных — что недопустимо.
Частые ошибки
Бизнес-инварианты в read-таблице. Если в order_summary добавить CHECK-ограничение вроде «сумма не может быть отрицательной» — это означает, что read-model начинает диктовать правила бизнеса. Инварианты живут только в агрегате на write-стороне.
Read-model как единственный источник правды. Если write-side не хранит достаточно данных для перестройки, read-model становится незаменимой — и любая её потеря катастрофична. Source of truth — всегда write-side.
Синхронная запись в read-model из write-транзакции. Удобно в краткосрочной перспективе, но связывает две стороны накрепко. Стоит одной стороне поменять схему — другая ломается.
Загрузка агрегата через основной репозиторий для query-запросов. ORM загружает граф объектов, делает лишние запросы, применяет ленивую загрузку. Для чтения — отдельный ViewRepository с простым SELECT.
Коротко
- Read-model — это проекция данных в форме, удобной для чтения: один
SELECTбезJOINвозвращает готовый ответ. - Схема read-model продиктована потребителем, а не write-стороной; они могут сильно отличаться.
- Хранилище выбирается под характер запросов: PG-таблица для табличных запросов, materialized view для агрегаций, Redis для горячих lookup, ElasticSearch для полнотекстового поиска.
- Чтение — через
ViewRepositoryс read-onlyAsyncSessionи чистым SQL, безcommit. - Обновление — только через события (outbox → Kafka → consumer), не синхронно внутри write-транзакции.
- Read-model всегда восстановима из write-стороны — для этого нужен перестройщик.
- Source of truth — write-side агрегаты; read-model производная.
Что почитать дальше
- Синхронизация через события — как outbox и Kafka доставляют события до read-model.
- Query side — как query-handler читает из read-model.
- Command side — что возвращает command-handler и почему не read-DTO.
- Уровень и эволюция CQRS — когда переходить от простого разделения к event-driven read-model.