CQRS — не переключатель «включить/выключить». Это шкала: берёшь ровно столько, сколько нужно сейчас, и добавляешь следующий уровень по мере роста. Начинать сразу с event-driven архитектуры на молодом сервисе — дорого и бессмысленно.
Разберём четыре ступени: от обычного сервиса до полноценной read-model с Kafka.
Уровень 1: обычный сервис без CQRS
Большинство сервисов начинают отсюда. Один @Injectable() класс, методы для чтения и записи в одном месте, TypeORM работает напрямую.
@Injectable()
export class OrderService {
constructor(private readonly dataSource: DataSource) {}
async createOrder(dto: CreateOrderDto): Promise<string> {
const order = Order.create(dto.customerId, dto.items);
await this.dataSource.getRepository(OrderEntity).save(order.toEntity());
return order.id;
}
async getOrder(id: string): Promise<OrderDto> {
const row = await this.dataSource.getRepository(OrderEntity).findOneBy({ id });
if (!row) throw new OrderNotFoundError(id);
return toOrderDto(row);
}
}
Когда это нормально: простые CRUD-сервисы, внутренние утилиты, прокси. Нет смысла усложнять, пока нет реального домена.
Уровень 2: маркеры Command и Query
Когда появляется настоящая бизнес-логика, сервис начинает разрастаться. Создание заказа требует валидации, расчёта скидок, отправки событий. Чтение хочет собирать данные из нескольких таблиц. Всё это в одном классе превращается в кашу.
Решение — разделить команды (запись) и запросы (чтение) явно, через маркеры Command<R> и Query<R>.
// core/order/port/in/create-order.command.ts
export class CreateOrderCommand implements Command<OrderId> {
constructor(
readonly customerId: CustomerId,
readonly items: ReadonlyArray<OrderItemInput>,
) {}
}
// core/order/port/in/get-order.query.ts
export class GetOrderQuery implements Query<OrderSummary> {
constructor(readonly orderId: OrderId) {}
}
Каждый use-case получает отдельный обработчик:
// application/order/create-order.handler.ts
@Injectable()
export class CreateOrderHandler implements Handler<CreateOrderCommand, OrderId> {
constructor(
@Inject(ORDER_REPOSITORY) private readonly orders: OrderRepository,
@Inject(TX_RUNNER) private readonly tx: TransactionRunner,
) {}
async execute(cmd: CreateOrderCommand): Promise<OrderId> {
return this.tx.run(async () => {
const order = Order.create(cmd.customerId, cmd.items);
await this.orders.save(order);
return order.id;
});
}
}
// application/order/get-order.handler.ts
@Injectable()
export class GetOrderHandler implements Handler<GetOrderQuery, OrderSummary> {
constructor(
@Inject(ORDER_REPOSITORY) private readonly orders: OrderRepository,
) {}
async execute(query: GetOrderQuery): Promise<OrderSummary> {
const order = await this.orders.byId(query.orderId);
if (!order) throw new OrderNotFoundError(query.orderId);
return toOrderSummary(order);
}
}
Два важных момента на этом уровне:
- Query-handler работает без транзакции. Чтение не нуждается в
tx.run()— это намеренно. Транзакция здесь только добавила бы нагрузку без пользы. - Command возвращает минимум.
CreateOrderHandlerвозвращаетOrderId, а не полный объект. Если UI нужны данные после создания — контроллер вызываетGetOrderHandlerотдельно.
Read и write пока используют один OrderRepository. Это нормально на этом уровне.
Уровень 3 split: отдельный репозиторий для чтения
Со временем запросы на чтение начинают отличаться от модели записи. Клиентский портал хочет видеть customer_name прямо в строке заказа, без join'ов. Нужна пагинация с фильтрами. TypeORM-маппинг агрегата плохо подходит для таких запросов.
Решение — завести отдельный интерфейс OrderViewRepository специально для чтения.
// core/order/port/out/order.repository.ts — для записи
export interface OrderRepository {
byId(id: OrderId): Promise<Order | null>;
save(order: Order): Promise<void>;
}
// core/order/port/out/order-view.repository.ts — для чтения
export interface OrderViewRepository {
summary(orderId: OrderId): Promise<OrderSummary | null>;
search(customerId: CustomerId, status: OrderStatus, page: PageRequest): Promise<Page<OrderSummary>>;
}
Реализация OrderViewRepository использует чистый SQL вместо TypeORM-методов:
@Injectable()
export class TypeOrmOrderViewRepository implements OrderViewRepository {
constructor(private readonly dataSource: DataSource) {}
async summary(orderId: OrderId): Promise<OrderSummary | null> {
const rows = await this.dataSource.query(
`SELECT o.id, o.status, o.customer_name, o.total_amount, o.created_at
FROM orders o
WHERE o.id = $1`,
[orderId],
);
return rows[0] ? toOrderSummary(rows[0]) : null;
}
async search(customerId: CustomerId, status: OrderStatus, page: PageRequest): Promise<Page<OrderSummary>> {
const rows = await this.dataSource.query(
`SELECT o.id, o.status, o.customer_name, o.total_amount, o.created_at
FROM orders o
WHERE o.customer_id = $1
AND ($2::text IS NULL OR o.status = $2)
ORDER BY o.created_at DESC
LIMIT $3 OFFSET $4`,
[customerId, status ?? null, page.size, page.offset()],
);
return toPage(rows, page);
}
}
Query-handler теперь работает только с OrderViewRepository — OrderRepository для записи он не видит:
@Injectable()
export class GetOrderHandler implements Handler<GetOrderQuery, OrderSummary> {
constructor(
@Inject(ORDER_VIEW_REPOSITORY) private readonly view: OrderViewRepository,
) {}
async execute(query: GetOrderQuery): Promise<OrderSummary> {
const summary = await this.view.summary(query.orderId);
if (!summary) throw new OrderNotFoundError(query.orderId);
return summary;
}
}
Физически всё по-прежнему в одной PostgreSQL — просто разные запросы. Разделение пока на уровне кода, не инфраструктуры.
Уровень 3 event-driven: отдельное хранилище для read-model
Когда нагрузка на чтение начинает мешать записи или read-проекция фундаментально другая (полнотекстовый поиск, аналитические сводки), read-model переезжает в отдельное хранилище.
Данные туда попадают через события:
Запись: Чтение:
PostgreSQL order_summary (PG-таблица / Redis / ES)
├── orders ├── customer_name (денормализовано)
└── outbox_events └── индексы под нужные запросы
↓
outbox-relay (SKIP LOCKED)
↓
Kafka (order.events)
↓
consumer
↓
UPSERT order_summary
Что появляется дополнительно:
- Таблица
outbox_events— событие пишется в той же транзакции, что и агрегат. Это гарантирует, что событие не потеряется при сбое. - Outbox-relay — NestJS-задача, которая читает необработанные события через
SELECT ... FOR UPDATE SKIP LOCKEDи публикует их в Kafka. - Consumer — подписывается на топик и обновляет
order_summaryчерез UPSERT. - Защита от повторной обработки — таблица
processed_eventили проверка версии в UPSERT.
@Injectable()
export class OrderEventConsumer {
constructor(private readonly dataSource: DataSource) {}
@EventPattern('order.events')
async handle(payload: OrderEventPayload): Promise<void> {
await this.dataSource.transaction(async (em) => {
const already = await em.query(
`SELECT 1 FROM processed_event WHERE event_id = $1`, [payload.eventId],
);
if (already.length) return;
await em.query(
`INSERT INTO order_summary (id, status, customer_name, total_amount, updated_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (id) DO UPDATE
SET status = EXCLUDED.status,
total_amount = EXCLUDED.total_amount,
updated_at = EXCLUDED.updated_at`,
[payload.orderId, payload.status, payload.customerName, payload.totalAmount],
);
await em.query(
`INSERT INTO processed_event (event_id, processed_at) VALUES ($1, NOW())`,
[payload.eventId],
);
});
}
}
За это платят запаздыванием чтения: данные появляются в read-model через 100ms–1s в норме, дольше при сбоях. Это приемлемо для большинства UI, но нужно учитывать.
До этого порога дешевле справляется реплика для чтения плюс кеш.
Как сервис растёт по уровням
Типичный путь:
- Стартовали как небольшой утилитный сервис. Плоский
ProductService, TypeORM Entity наружу. Уровень 1. - Появился реальный домен: ввели
Command<R>/Query<R>,Handler,TransactionRunner. Уровень 2. - Read-сторона стала сложнее: клиентский портал хочет проекций с denormalized полями, без join. Завели
ProductViewRepositoryс прямым SQL. Уровень 3 split. - Нагрузка на чтение стала бить по write-сайду при пиках — переехали на Redis-проекцию с outbox и Kafka. Уровень 3 event-driven.
Каждый переход обоснован реальной болью, а не желанием использовать «современную архитектуру».
Частые ошибки
Маркеры без смысла. Добавить Command<R> / Query<R> на Уровне 1, не заведя ни TransactionRunner, ни разделения путей для чтения и записи — это форма для галочки. Либо переходи на Уровень 2 полностью, либо убери маркеры.
Один репозиторий при event-driven инфре. Если read-model уже в отдельном хранилище, но query-handler по-прежнему инжектит ORDER_REPOSITORY — разделение теряет смысл. Нужен отдельный ORDER_VIEW_REPOSITORY.
Прыжок через уровни. Начать с event-driven, пропустив Уровень 2 и 3-split — избыточная сложность без практики. Каждый уровень строится на предыдущем.
Query-handler с транзакцией. Оборачивать чтение в tx.run() на Уровне 2 — лишняя нагрузка. Маркер Query<R> означает именно «без транзакции».
Коротко
- CQRS — шкала зрелости: берёшь ровно столько, сколько нужно сейчас.
- Уровень 1 — обычный
@Injectable()сервис; маркеры не нужны. - Уровень 2 —
Command<R>/Query<R>обязательны; query-handler без транзакции; одинOrderRepository. - Уровень 3 split — отдельный
OrderViewRepositoryс прямым SQL; физически одна БД. - Уровень 3 event-driven — read-model в отдельном хранилище, синхронизация через outbox + Kafka; принимаешь запаздывание данных.
- Переходы строго снизу вверх, каждый обоснован реальной нагрузкой или болью.
Что почитать дальше
- Когда CQRS оправдан в NestJS — пороги перехода между уровнями.
- Command side в NestJS — как устроен write-handler на Уровне 2 и 3.
- Query side в NestJS — query-handler с
ViewRepositoryи прямым SQL.