Опирается на правила: R-CQRS-RM-1R-CQRS-RM-4 и R-CQRS-RM-X1R-CQRS-RM-X3 из CQRS Style Guide → раздел 4. Read-model.

Важно знать

  • Read-model — данные в форме, удобной для чтения: денормализованные, pre-aggregated, без join'ов на горячем пути.
  • Хранилище выбирается под паттерн чтения: PG-таблица, Redis, ElasticSearch — не одно «универсальное».
  • Schema read-model независима от write-схемы: один атрибут может присутствовать в нескольких read-DTO под разными именами.
  • Читается через <X>ViewRepository — raw select через DataSource.query(), без транзакции и без блокировок (R-TYPEORM-QRY-4).
  • Обновляется через события (outbox → Kafka → consumer), не синхронно в write-транзакции (R-CQRS-RM-3).
  • Read-model восстановима из write-side: при потере — rebuild-сервис проходит по агрегатам и заново строит проекцию (R-CQRS-RM-4).
  • Source of truth — write-side агрегаты. Read-model — производная.
  • Никакой бизнес-логики в read-model (CHECK-инварианты, триггеры). Никакого bidirectional sync (R-CQRS-RM-X1, R-CQRS-RM-X3).

Read-model — денормализованное представление, в котором данные уже сложены так, как их хочет конечный потребитель: один SELECT без join'ов возвращает готовый объект для UI / API. Цена — eventual consistency и дополнительная инфраструктура; выгода — порядки разницы в latency и пропускной способности. Статья раскрывает раздел 4 CQRS Style Guide в идиомах NestJS / TypeScript.

Где хранить read-model

R-CQRS-RM-1: выбор хранилища — функция паттерна чтения, не предпочтений команды.

Паттерн чтенияХранилищеПочему
Tabular query с пагинацией, фильтром, сортировкойДенормализованная PG-таблицаРеляционка хорошо умеет такой workload, sync через outbox
Тяжёлые aggregations (GROUP BY миллионов строк)PG materialized viewPre-computed, refresh по расписанию или событию
Key-lookup hot-keys (по ID, по короткому ключу)RedisSub-millisecond latency, горизонтальное масштабирование
Full-text search, multi-field фильтры с relevanceElasticSearch / OpenSearchInverted index, ranking, faceted search

PG-таблица — дефолт

Денормализованная таблица в той же СУБД — почти всегда первый шаг. Никакой новой инфраструктуры, синхронизация через outbox.

CREATE TABLE order_summary (
    order_id        BIGINT PRIMARY KEY,
    customer_id     BIGINT NOT NULL,
    customer_name   TEXT NOT NULL,
    customer_email  TEXT NOT NULL,
    status          TEXT NOT NULL,
    item_count      INTEGER NOT NULL,
    total_amount    NUMERIC(19,4) NOT NULL,
    currency        TEXT NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL,
    confirmed_at    TIMESTAMPTZ,
    shipped_at      TIMESTAMPTZ,
    updated_at      TIMESTAMPTZ NOT NULL,
    version         BIGINT NOT NULL DEFAULT 0
);
CREATE INDEX ix_os_customer    ON order_summary (customer_id, created_at DESC);
CREATE INDEX ix_os_status_date ON order_summary (status, created_at DESC);

Поле version — для idempotent UPDATE в consumer'е (см. Sync через события).

PG materialized view — для тяжёлых aggregations

Сводка вроде «оборот по продуктам за месяц» пересчитывается редко, хранится заранее:

CREATE MATERIALIZED VIEW product_revenue_daily AS
SELECT
    p.product_id,
    p.name,
    DATE(oi.created_at)              AS day,
    SUM(oi.quantity * oi.unit_price) AS revenue,
    COUNT(DISTINCT o.id)             AS order_count
FROM order_item oi
JOIN product p ON p.product_id = oi.product_id
JOIN "order" o ON o.id = oi.order_id
WHERE o.status IN ('CONFIRMED', 'SHIPPED', 'DELIVERED')
GROUP BY p.product_id, p.name, DATE(oi.created_at);

CREATE UNIQUE INDEX ux_prd_pk ON product_revenue_daily (product_id, day);

Refresh — REFRESH MATERIALIZED VIEW CONCURRENTLY по расписанию (cron job в NestJS через @nestjs/schedule) или по событию OrderConfirmed.

Redis — для hot-key lookup

Проекция Customer → ActiveSubscriptionPlan — читается на каждый запрос:

// adapters/out/persistence/redis-subscription-view.repository.ts
@Injectable()
export class RedisSubscriptionViewRepository implements SubscriptionViewRepository {
  constructor(@Inject(REDIS_CLIENT) private readonly redis: Redis) {}

  async findByCustomer(customerId: CustomerId): Promise<SubscriptionPlan | null> {
    const raw = await this.redis.get(`customer:${customerId.value}:plan`);
    return raw ? JSON.parse(raw) as SubscriptionPlan : null;
  }

  async upsert(customerId: CustomerId, plan: SubscriptionPlan): Promise<void> {
    await this.redis.set(
      `customer:${customerId.value}:plan`,
      JSON.stringify(plan),
      'EX', 3600,
    );
  }
}

Consumer на SubscriptionUpdated вызывает upsert. Разница с обычным кешем: read-model в Redis — источник ответа, не fallback.

Поиск по описаниям Product, фильтры по 20+ атрибутам с relevance:

// adapters/out/search/elasticsearch-product-view.repository.ts
@Injectable()
export class ElasticsearchProductViewRepository implements ProductSearchRepository {
  constructor(@Inject(ES_CLIENT) private readonly es: Client) {}

  async search(params: ProductSearchQuery): Promise<ProductSearchResult[]> {
    const { hits } = await this.es.search({
      index: 'products',
      query: {
        bool: {
          must: [{ match: { name: params.q } }],
          filter: [
            ...(params.minRating ? [{ range: { rating: { gte: params.minRating } } }] : []),
            ...(params.inStock   ? [{ term:  { in_stock: true } }]                   : []),
          ],
        },
      },
    });
    return hits.hits.map(h => toProductSearchResult(h._source));
  }
}

Consumer'ы на ProductCreated, ProductPriceChanged, StockUpdated обновляют документ в индексе.

Schema независимая от write-стороны

R-CQRS-RM-2: read-схема и read-DTO продиктованы потребителем, не агрегатом.

write-схема:                          read-схема (order_summary):
  order(id, customer_id, status)        order_summary(
  order_item(order_id, qty, price)         order_id,
  customer(id, name, email)               customer_name,   ← денормализовано из customer
                                          customer_email,  ← денормализовано из customer
                                          status,
                                          item_count       ← pre-computed из order_item
                                       )

В TypeScript read-DTO — readonly-класс или Object.freeze():

// core/order/port/out/view/order-summary.view.ts
export class OrderSummaryView {
  constructor(
    readonly orderId: string,
    readonly customerId: string,
    readonly customerName: string,
    readonly status: string,
    readonly itemCount: number,
    readonly totalAmount: string,
    readonly currency: string,
    readonly createdAt: Date,
    readonly confirmedAt: Date | null,
  ) {}
}

export function toOrderSummaryView(row: Record<string, unknown>): OrderSummaryView {
  return new OrderSummaryView(
    String(row['order_id']),
    String(row['customer_id']),
    String(row['customer_name']),
    String(row['status']),
    Number(row['item_count']),
    String(row['total_amount']),
    String(row['currency']),
    new Date(row['created_at'] as string),
    row['confirmed_at'] ? new Date(row['confirmed_at'] as string) : null,
  );
}

ViewRepository — raw select без транзакции

R-CQRS-QRY-2, R-CQRS-TIER-3: read-side использует отдельный <X>ViewRepository, который читает через DataSource.query() без транзакции, без relations, без lock (R-TYPEORM-QRY-4).

// core/order/port/out/order-view.repository.ts
export interface OrderViewRepository {
  summary(orderId: string): Promise<OrderSummaryView | null>;
  listByCustomer(customerId: string, limit: number, offset: number): Promise<OrderSummaryView[]>;
}

export const ORDER_VIEW_REPOSITORY = Symbol('ORDER_VIEW_REPOSITORY');
// adapters/out/persistence/typeorm-order-view.repository.ts
@Injectable()
export class TypeOrmOrderViewRepository implements OrderViewRepository {
  constructor(private readonly dataSource: DataSource) {}

  async summary(orderId: string): Promise<OrderSummaryView | null> {
    const rows = await this.dataSource.query(
      `SELECT order_id, customer_id, customer_name, customer_email,
              status, item_count, total_amount, currency,
              created_at, confirmed_at
         FROM order_summary
        WHERE order_id = $1`,
      [orderId],
    );
    return rows[0] ? toOrderSummaryView(rows[0]) : null;
  }

  async listByCustomer(customerId: string, limit: number, offset: number): Promise<OrderSummaryView[]> {
    const rows = await this.dataSource.query(
      `SELECT order_id, customer_id, customer_name, customer_email,
              status, item_count, total_amount, currency,
              created_at, confirmed_at
         FROM order_summary
        WHERE customer_id = $1
        ORDER BY created_at DESC
        LIMIT $2 OFFSET $3`,
      [customerId, limit, offset],
    );
    return rows.map(toOrderSummaryView);
  }
}

Query-handler инжектирует ORDER_VIEW_REPOSITORY, не вызывает доменных методов агрегата:

// core/order/usecase/get-order-summary.handler.ts
@Injectable()
export class GetOrderSummaryHandler implements Handler<GetOrderSummary, OrderSummaryView | null> {
  constructor(
    @Inject(ORDER_VIEW_REPOSITORY) private readonly orderView: OrderViewRepository,
  ) {}

  async execute(query: GetOrderSummary): Promise<OrderSummaryView | null> {
    return this.orderView.summary(query.orderId);
  }
}

Обновление через события — eventual consistency

R-CQRS-RM-3: read-model обновляется только через outbox → Kafka → consumer. Никогда — в write-транзакции.

1. command-handler сохраняет Order, записывает OrderConfirmed → outbox-таблица (одна транзакция)
2. outbox-relay (SKIP LOCKED loop) публикует событие в Kafka
3. read-side consumer ловит OrderConfirmed
4. UPDATE order_summary SET status = 'CONFIRMED', confirmed_at = $1, version = version + 1
   WHERE order_id = $2 AND version < $3

Latency в стационарном режиме — 100ms–1s. Это архитектурно ожидаемо: UI должен понимать eventual consistency.

Почему не synchronous UPDATE в той же транзакции:

  • Read-model теряет decoupling. ALTER TABLE order_summary блокирует write-транзакции.
  • При rollback write-агрегата order_summary уже могла измениться (если consumer внешний), и это расхождение не откатить.
  • Cross-DB synchronous sync невозможен без 2PC, который запрещён.

Подробно — в Sync через события.

Read-model восстановима из write-side

R-CQRS-RM-4: для каждой read-model должен существовать скрипт rebuild'а, который проходит по write-side агрегатам и заново строит проекцию.

// core/order/service/order-summary-rebuilder.ts
@Injectable()
export class OrderSummaryRebuilder {
  constructor(
    @Inject(ORDER_REPOSITORY)      private readonly orders: OrderRepository,
    @Inject(ORDER_VIEW_REPOSITORY) private readonly orderView: OrderViewRepository,
  ) {}

  async rebuildAll(): Promise<void> {
    let lastId = 0n;
    const batchSize = 500;

    while (true) {
      const batch = await this.orders.findAllAfter(lastId, batchSize);
      if (batch.length === 0) break;

      const rows = batch.map(order => this.toSummaryRow(order));
      await this.orderView.upsertBatch(rows);

      lastId = batch[batch.length - 1].id.value;
    }
  }

  private toSummaryRow(order: Order): OrderSummaryUpsert {
    return {
      orderId:       order.id.value.toString(),
      customerId:    order.customerId.value.toString(),
      customerName:  order.snapshot().customerName,
      customerEmail: order.snapshot().customerEmail,
      status:        order.status,
      itemCount:     order.items.length,
      totalAmount:   order.totalAmount.amount.toFixed(4),
      currency:      order.totalAmount.currency,
      createdAt:     order.createdAt,
      confirmedAt:   order.confirmedAt ?? null,
      updatedAt:     new Date(),
      version:       0n,
    };
  }
}

Rebuild-сервис применяется в трёх сценариях:

  1. Disaster recovery. Read-model потеряна (отказ Redis cluster, drop в ElasticSearch, миграция).
  2. Bootstrap нового read-store. Добавляется ElasticSearch — он пустой, надо загнать existing-данные.
  3. Структурная миграция read-схемы. Новое поле в order_summary — для старых записей rebuild дозаполнит.

Без rebuild-скрипта read-model становится первичным хранилищем — что нарушает R-CQRS-RM-X2.

Что запрещено

АнтипаттернПравилоЧто взамен
CHECK-constraint бизнес-инварианта в read-таблицеR-CQRS-RM-X1Инвариант в агрегате; read-model — только проекция
Read-model как единственный источник правды (невосстановимая)R-CQRS-RM-X2Source of truth — write-side; rebuild-скрипт обязателен
Bidirectional sync: read-handler пишет в write-sideR-CQRS-RM-X3Одно направление: write → events → read
Синхронный INSERT в read-таблицу внутри write-транзакцииR-CQRS-SYNC-X1Outbox + Kafka + consumer
Грузить агрегат через основной <X>Repository и маппить в read-DTOR-CQRS-QRY-X2Отдельный <X>ViewRepository с raw select
Event payload = TypeORM-Entity write-схемы (schema-coupled)R-CQRS-SYNC-X3Версионированный event-contract, независимый от схемы БД
Маркеры Query<R> без enforcement (handler пишет, использует транзакцию)R-CQRS-TIER-X1Query-handler: только read, без транзакции, без write

Куда дальше

  • CQRS → раздел 4. Read-model — нормативные формулировки R-CQRS-RM-*.
  • Sync через события — как outbox + Kafka доставляет события до read-model.
  • Query side — как query-handler читает из read-model через <X>ViewRepository.
  • Command side — что command-handler возвращает и почему не read-DTO.
  • Уровень и эволюция — когда переходить от lightweight к event-driven read-model.
  • Когда CQRS оправдан — критерии для полного CQRS vs lightweight маркеров.