← назад к разделу

CQRS — не переключатель «включить/выключить». Это шкала: берёшь ровно столько, сколько нужно сейчас, и добавляешь следующий уровень по мере роста. Начинать сразу с event-driven архитектуры на молодом сервисе — дорого и бессмысленно.

Разберём четыре ступени: от обычного сервиса до полноценной read-model с Kafka.

Уровень 1: обычный сервис без CQRS

Большинство сервисов начинают отсюда. Один @Injectable() класс, методы для чтения и записи в одном месте, TypeORM работает напрямую.

@Injectable()
export class OrderService {
  constructor(private readonly dataSource: DataSource) {}

  async createOrder(dto: CreateOrderDto): Promise<string> {
    const order = Order.create(dto.customerId, dto.items);
    await this.dataSource.getRepository(OrderEntity).save(order.toEntity());
    return order.id;
  }

  async getOrder(id: string): Promise<OrderDto> {
    const row = await this.dataSource.getRepository(OrderEntity).findOneBy({ id });
    if (!row) throw new OrderNotFoundError(id);
    return toOrderDto(row);
  }
}

Когда это нормально: простые CRUD-сервисы, внутренние утилиты, прокси. Нет смысла усложнять, пока нет реального домена.

Уровень 2: маркеры Command и Query

Когда появляется настоящая бизнес-логика, сервис начинает разрастаться. Создание заказа требует валидации, расчёта скидок, отправки событий. Чтение хочет собирать данные из нескольких таблиц. Всё это в одном классе превращается в кашу.

Решение — разделить команды (запись) и запросы (чтение) явно, через маркеры Command<R> и Query<R>.

// core/order/port/in/create-order.command.ts
export class CreateOrderCommand implements Command<OrderId> {
  constructor(
    readonly customerId: CustomerId,
    readonly items: ReadonlyArray<OrderItemInput>,
  ) {}
}

// core/order/port/in/get-order.query.ts
export class GetOrderQuery implements Query<OrderSummary> {
  constructor(readonly orderId: OrderId) {}
}

Каждый use-case получает отдельный обработчик:

// application/order/create-order.handler.ts
@Injectable()
export class CreateOrderHandler implements Handler<CreateOrderCommand, OrderId> {
  constructor(
    @Inject(ORDER_REPOSITORY) private readonly orders: OrderRepository,
    @Inject(TX_RUNNER) private readonly tx: TransactionRunner,
  ) {}

  async execute(cmd: CreateOrderCommand): Promise<OrderId> {
    return this.tx.run(async () => {
      const order = Order.create(cmd.customerId, cmd.items);
      await this.orders.save(order);
      return order.id;
    });
  }
}

// application/order/get-order.handler.ts
@Injectable()
export class GetOrderHandler implements Handler<GetOrderQuery, OrderSummary> {
  constructor(
    @Inject(ORDER_REPOSITORY) private readonly orders: OrderRepository,
  ) {}

  async execute(query: GetOrderQuery): Promise<OrderSummary> {
    const order = await this.orders.byId(query.orderId);
    if (!order) throw new OrderNotFoundError(query.orderId);
    return toOrderSummary(order);
  }
}

Два важных момента на этом уровне:

  • Query-handler работает без транзакции. Чтение не нуждается в tx.run() — это намеренно. Транзакция здесь только добавила бы нагрузку без пользы.
  • Command возвращает минимум. CreateOrderHandler возвращает OrderId, а не полный объект. Если UI нужны данные после создания — контроллер вызывает GetOrderHandler отдельно.

Read и write пока используют один OrderRepository. Это нормально на этом уровне.

Уровень 3 split: отдельный репозиторий для чтения

Со временем запросы на чтение начинают отличаться от модели записи. Клиентский портал хочет видеть customer_name прямо в строке заказа, без join'ов. Нужна пагинация с фильтрами. TypeORM-маппинг агрегата плохо подходит для таких запросов.

Решение — завести отдельный интерфейс OrderViewRepository специально для чтения.

// core/order/port/out/order.repository.ts — для записи
export interface OrderRepository {
  byId(id: OrderId): Promise<Order | null>;
  save(order: Order): Promise<void>;
}

// core/order/port/out/order-view.repository.ts — для чтения
export interface OrderViewRepository {
  summary(orderId: OrderId): Promise<OrderSummary | null>;
  search(customerId: CustomerId, status: OrderStatus, page: PageRequest): Promise<Page<OrderSummary>>;
}

Реализация OrderViewRepository использует чистый SQL вместо TypeORM-методов:

@Injectable()
export class TypeOrmOrderViewRepository implements OrderViewRepository {
  constructor(private readonly dataSource: DataSource) {}

  async summary(orderId: OrderId): Promise<OrderSummary | null> {
    const rows = await this.dataSource.query(
      `SELECT o.id, o.status, o.customer_name, o.total_amount, o.created_at
         FROM orders o
        WHERE o.id = $1`,
      [orderId],
    );
    return rows[0] ? toOrderSummary(rows[0]) : null;
  }

  async search(customerId: CustomerId, status: OrderStatus, page: PageRequest): Promise<Page<OrderSummary>> {
    const rows = await this.dataSource.query(
      `SELECT o.id, o.status, o.customer_name, o.total_amount, o.created_at
         FROM orders o
        WHERE o.customer_id = $1
          AND ($2::text IS NULL OR o.status = $2)
        ORDER BY o.created_at DESC
        LIMIT $3 OFFSET $4`,
      [customerId, status ?? null, page.size, page.offset()],
    );
    return toPage(rows, page);
  }
}

Query-handler теперь работает только с OrderViewRepositoryOrderRepository для записи он не видит:

@Injectable()
export class GetOrderHandler implements Handler<GetOrderQuery, OrderSummary> {
  constructor(
    @Inject(ORDER_VIEW_REPOSITORY) private readonly view: OrderViewRepository,
  ) {}

  async execute(query: GetOrderQuery): Promise<OrderSummary> {
    const summary = await this.view.summary(query.orderId);
    if (!summary) throw new OrderNotFoundError(query.orderId);
    return summary;
  }
}

Физически всё по-прежнему в одной PostgreSQL — просто разные запросы. Разделение пока на уровне кода, не инфраструктуры.

Уровень 3 event-driven: отдельное хранилище для read-model

Когда нагрузка на чтение начинает мешать записи или read-проекция фундаментально другая (полнотекстовый поиск, аналитические сводки), read-model переезжает в отдельное хранилище.

Данные туда попадают через события:

Запись:                            Чтение:
  PostgreSQL                         order_summary (PG-таблица / Redis / ES)
  ├── orders                         ├── customer_name (денормализовано)
  └── outbox_events                  └── индексы под нужные запросы
        ↓
  outbox-relay (SKIP LOCKED)
        ↓
  Kafka (order.events)
        ↓
  consumer
        ↓
  UPSERT order_summary

Что появляется дополнительно:

  • Таблица outbox_events — событие пишется в той же транзакции, что и агрегат. Это гарантирует, что событие не потеряется при сбое.
  • Outbox-relay — NestJS-задача, которая читает необработанные события через SELECT ... FOR UPDATE SKIP LOCKED и публикует их в Kafka.
  • Consumer — подписывается на топик и обновляет order_summary через UPSERT.
  • Защита от повторной обработки — таблица processed_event или проверка версии в UPSERT.
@Injectable()
export class OrderEventConsumer {
  constructor(private readonly dataSource: DataSource) {}

  @EventPattern('order.events')
  async handle(payload: OrderEventPayload): Promise<void> {
    await this.dataSource.transaction(async (em) => {
      const already = await em.query(
        `SELECT 1 FROM processed_event WHERE event_id = $1`, [payload.eventId],
      );
      if (already.length) return;

      await em.query(
        `INSERT INTO order_summary (id, status, customer_name, total_amount, updated_at)
         VALUES ($1, $2, $3, $4, NOW())
         ON CONFLICT (id) DO UPDATE
           SET status = EXCLUDED.status,
               total_amount = EXCLUDED.total_amount,
               updated_at = EXCLUDED.updated_at`,
        [payload.orderId, payload.status, payload.customerName, payload.totalAmount],
      );
      await em.query(
        `INSERT INTO processed_event (event_id, processed_at) VALUES ($1, NOW())`,
        [payload.eventId],
      );
    });
  }
}

За это платят запаздыванием чтения: данные появляются в read-model через 100ms–1s в норме, дольше при сбоях. Это приемлемо для большинства UI, но нужно учитывать.

До этого порога дешевле справляется реплика для чтения плюс кеш.

Как сервис растёт по уровням

Типичный путь:

  1. Стартовали как небольшой утилитный сервис. Плоский ProductService, TypeORM Entity наружу. Уровень 1.
  2. Появился реальный домен: ввели Command<R> / Query<R>, Handler, TransactionRunner. Уровень 2.
  3. Read-сторона стала сложнее: клиентский портал хочет проекций с denormalized полями, без join. Завели ProductViewRepository с прямым SQL. Уровень 3 split.
  4. Нагрузка на чтение стала бить по write-сайду при пиках — переехали на Redis-проекцию с outbox и Kafka. Уровень 3 event-driven.

Каждый переход обоснован реальной болью, а не желанием использовать «современную архитектуру».

Частые ошибки

Маркеры без смысла. Добавить Command<R> / Query<R> на Уровне 1, не заведя ни TransactionRunner, ни разделения путей для чтения и записи — это форма для галочки. Либо переходи на Уровень 2 полностью, либо убери маркеры.

Один репозиторий при event-driven инфре. Если read-model уже в отдельном хранилище, но query-handler по-прежнему инжектит ORDER_REPOSITORY — разделение теряет смысл. Нужен отдельный ORDER_VIEW_REPOSITORY.

Прыжок через уровни. Начать с event-driven, пропустив Уровень 2 и 3-split — избыточная сложность без практики. Каждый уровень строится на предыдущем.

Query-handler с транзакцией. Оборачивать чтение в tx.run() на Уровне 2 — лишняя нагрузка. Маркер Query<R> означает именно «без транзакции».

Коротко

  • CQRS — шкала зрелости: берёшь ровно столько, сколько нужно сейчас.
  • Уровень 1 — обычный @Injectable() сервис; маркеры не нужны.
  • Уровень 2Command<R> / Query<R> обязательны; query-handler без транзакции; один OrderRepository.
  • Уровень 3 split — отдельный OrderViewRepository с прямым SQL; физически одна БД.
  • Уровень 3 event-driven — read-model в отдельном хранилище, синхронизация через outbox + Kafka; принимаешь запаздывание данных.
  • Переходы строго снизу вверх, каждый обоснован реальной нагрузкой или болью.

Что почитать дальше