Опирается на правила:
R-DIST-EC-1…R-DIST-EC-4иR-DIST-EC-X1…R-DIST-EC-X2из Distributed Patterns Rules → раздел 4. Eventual consistency.
Важно знать
- Eventual consistency — норма в распределённой системе, но требует явного контракта: клиент обязан знать, что данные могут отставать.
- Декларация в OpenAPI (
@ApiOperation,@ApiResponse) — для каждого eventual-consistent endpoint указывай ожидаемую задержку прямо вdescription.- Read-your-writes не работает само: нужен polling, write-side endpoint, version-токен или synchronous wait в контроллере.
- Bounded staleness SLO — у каждой read-model явный лимит задержки («p99 < 5 секунд») и Prometheus-алерт, если превышается.
- Causal consistency через
version-поле в TypeORM-entity: receiver применяет событие только еслиevent.version > current.version, иначе игнорирует.- Молчаливая EC — главный антипаттерн: клиент делает write, сразу читает, получает устаревшие данные и теряет доверие к API.
- Strict immediate consistency через 2PC не масштабируется в Node-стеке и в Kafka-экосистеме вообще не реализуема — Kafka не XA.
@nestjs/cqrsSagas (RxJS поверх in-memory EventBus) не решают EC: события живут в одном процессе и не персистентны.
В распределённой системе невозможно одновременно иметь строгую согласованность, доступность и устойчивость к разделению сети — теорема CAP. UCP выбирает доступность и устойчивость, поэтому согласованность у нас eventual. Это решение требует явного оформления, не молчаливого «как-нибудь само догонит».
Декларация в API
R-DIST-EC-1: для endpoint, который читает eventual-consistent данные, в OpenAPI через @ApiOperation указываем ожидаемую задержку.
@ApiOperation({
summary: 'Получить список заказов клиента',
description: `
Возвращает summary заказов клиента из денормализованной read-проекции.
**Eventual consistency**: задержка от write в order-service до появления
в этом endpoint обычно < 1 секунды (p99 < 5 секунд). Если клиенту нужна
immediate consistency сразу после POST /orders — использовать GET /orders/:id,
этот endpoint читает write-side.
`.trim(),
})
@ApiResponse({ status: 200, description: 'Список заказов (могут отставать на 1–5 секунд)' })
@Get(':customerId/orders')
async getOrderSummaries(@Param('customerId') customerId: string) {
return this.orderProjectionService.findByCustomer(customerId);
}
Без этой декларации разработчик на клиентской стороне напишет тест POST /orders + сразу GET /customers/:id/orders и будет открывать баг-репорты, когда заказ ещё не появился в проекции.
Read-your-writes — четыре способа
R-DIST-EC-2: «клиент после своего write сразу читает свой результат» — отдельная задача, которая не решается сама.
1. Polling в клиенте
Клиент делает POST /orders, получает orderId, потом polling-ом GET до появления в read-проекции:
// client-side: browser / BFF
async function createOrderAndWait(payload: CreateOrderDto): Promise<OrderSummaryDto> {
const { orderId } = await orderApi.create(payload);
for (let i = 0; i < 20; i++) {
const summary = await orderApi.getSummary(orderId);
if (summary) return summary;
await sleep(200);
}
throw new Error(`OrderSummary not available for ${orderId} after 4s`);
}
Простой, клиент платит latency. Применимо для UI с прогресс-индикатором.
2. Synchronous wait в контроллере
Контроллер после write выполняет короткий polling read-model на стороне сервера:
@Post()
async create(@Body() dto: CreateOrderDto): Promise<OrderSummaryDto> {
const order = await this.createOrderUseCase.execute(dto);
const projection = await this.waitForProjection(order.id, 2000);
return projection ?? OrderSummaryDto.fromAggregate(order);
}
private async waitForProjection(orderId: string, timeoutMs: number): Promise<OrderSummaryDto | null> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const found = await this.projectionRepo.findByOrderId(orderId);
if (found) return found;
await sleep(100);
}
return null;
}
Подходит для критичных flow «создать заказ и сразу показать summary».
3. Version-токен клиенту
Контроллер возвращает version из write-side вместе с ответом. Клиент передаёт его в следующий GET, read-model ждёт, пока не догонит:
@Post()
async create(@Body() dto: CreateOrderDto): Promise<{ orderId: string; version: number }> {
const order = await this.createOrderUseCase.execute(dto);
return { orderId: order.id, version: order.version };
}
@Get(':orderId')
async getWithVersion(
@Param('orderId') orderId: string,
@Query('minVersion') minVersion?: string,
): Promise<OrderSummaryDto> {
const min = minVersion ? parseInt(minVersion, 10) : 0;
if (min > 0) {
const deadline = Date.now() + 3000;
while (Date.now() < deadline) {
const proj = await this.projectionRepo.findByOrderId(orderId);
if (proj && proj.version >= min) return proj;
await sleep(100);
}
}
return this.projectionRepo.findByOrderIdOrThrow(orderId);
}
4. Альтернативный endpoint из write-side
Два endpoint с явно разной семантикой:
POST /orders → CreateOrderUseCase (write-side)
GET /orders/:id → OrderRepository (write-side, immediate consistency)
GET /customers/:id/orders → OrderSummaryRepository (read-projection, eventual consistency)
Это рабочая комбинация: проекция держит нагрузку CQRS; immediate-endpoint обслуживает flow «сразу после write».
Bounded staleness SLO
R-DIST-EC-3: у каждой read-model явный SLO на максимальную задержку. Измеряем метрикой.
import { Counter, Histogram, Registry } from 'prom-client';
@Injectable()
export class OrderProjectionConsumer {
private readonly stalenessHistogram: Histogram;
constructor(
private readonly registry: Registry,
private readonly projectionRepo: OrderSummaryRepository,
) {
this.stalenessHistogram = new Histogram({
name: 'read_model_staleness_seconds',
help: 'Задержка между событием и обновлением read-model',
labelNames: ['model', 'event_type'],
buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
registers: [this.registry],
});
}
@EventPattern('order.events')
async handleOrderCreated(@Payload() event: OrderCreatedEvent): Promise<void> {
const staleness = (Date.now() - new Date(event.occurredAt).getTime()) / 1000;
this.stalenessHistogram.observe({ model: 'order_summary', event_type: event.type }, staleness);
await this.projectionRepo.upsert(toOrderSummary(event));
}
}
Алерт в Prometheus:
- alert: ReadModelStalenessHigh
expr: >
histogram_quantile(0.99,
sum by (le, model) (rate(read_model_staleness_seconds_bucket[5m]))
) > 5
for: 5m
annotations:
summary: "Read-model {{ $labels.model }} отстаёт более 5 секунд (p99)"
Без SLO read-model может тихо отстать на час, и об этом узнают из жалоб клиентов.
Causal consistency через version
R-DIST-EC-4: когда порядок событий важен, receiver проверяет монотонность version-поля и пропускает out-of-order events.
TypeORM-entity read-model с version:
@Entity('order_summary')
export class OrderSummaryEntity {
@PrimaryColumn('uuid')
orderId: string;
@Column('int')
version: number;
@Column('varchar')
status: string;
@Column('jsonb', { nullable: true })
payload: Record<string, unknown>;
}
Consumer с проверкой монотонности:
@EventPattern('order.events')
@UseInterceptors(ProcessedEventInterceptor)
async handleOrderUpdated(@Payload() event: OrderUpdatedEvent): Promise<void> {
await this.dataSource.transaction(async (m) => {
const current = await m.findOne(OrderSummaryEntity, { where: { orderId: event.orderId } });
if (current && event.aggregateVersion <= current.version) {
return;
}
await m.upsert(
OrderSummaryEntity,
{
orderId: event.orderId,
version: event.aggregateVersion,
status: event.status,
payload: event.payload,
},
['orderId'],
);
});
}
aggregateVersion — монотонно растущий счётчик, увеличивается на write-side при каждом изменении агрегата. При rebalance Kafka или retry из retry-topic consumer может получить события не по порядку — out-of-order события пропускаются, финальное состояние проекции остаётся корректным.
Vector clocks нужны редко — обычно достаточно скалярного version на агрегат.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Endpoint возвращает stale data без единого слова в OpenAPI | R-DIST-EC-X1 | @ApiOperation({ description: '...Eventual consistency: задержка до N секунд...' }) |
| 2PC для immediate consistency между сервисами | R-DIST-EC-X2 | redesign boundary, modular monolith или прими EC |
| Read-model без SLO на задержку | R-DIST-EC-3 | read_model_staleness_seconds + Prometheus-алерт |
| Out-of-order events применяются как есть | R-DIST-EC-4 | event.version > current.version до upsert |
| Polling без timeout и fallback | R-DIST-EC-2 | deadline + OrderSummaryDto.fromAggregate(order) как fallback |
@nestjs/cqrs Sagas (RxJS) для durable-оркестрации | R-DIST-SAGA-X3 | Temporal (@temporalio/*) или kafkajs-consumer + saga_<name> таблица |
Прямой producer.send после commit вместо outbox | R-DIST-OBX-X1 | outbox в той же DataSource.transaction |
Куда дальше
- Saga в NestJS —
sagaIdсквозной, in-flight саги — отдельный случай EC. - Idempotency в NestJS — consumer при retry обязан быть идемпотентным.
- Outbox + Inbox в NestJS — главный механизм синхронизации в EC.
- Compensation в NestJS — semantic compensation при частичных сбоях.
- Distributed transactions в NestJS — почему 2PC/XA не работает в Node-стеке.
- Когда применять — выбор между EC, modular monolith и redesign boundary.