Опирается на правила: R-DIST-EC-1R-DIST-EC-4 и R-DIST-EC-X1R-DIST-EC-X2 из Distributed Patterns Rules → раздел 4. Eventual consistency.

Важно знать

  • Eventual consistency — норма в распределённой системе, но требует явного контракта: клиент обязан знать, что данные могут отставать.
  • Декларация в OpenAPI (@ApiOperation, @ApiResponse) — для каждого eventual-consistent endpoint указывай ожидаемую задержку прямо в description.
  • Read-your-writes не работает само: нужен polling, write-side endpoint, version-токен или synchronous wait в контроллере.
  • Bounded staleness SLO — у каждой read-model явный лимит задержки («p99 < 5 секунд») и Prometheus-алерт, если превышается.
  • Causal consistency через version-поле в TypeORM-entity: receiver применяет событие только если event.version > current.version, иначе игнорирует.
  • Молчаливая EC — главный антипаттерн: клиент делает write, сразу читает, получает устаревшие данные и теряет доверие к API.
  • Strict immediate consistency через 2PC не масштабируется в Node-стеке и в Kafka-экосистеме вообще не реализуема — Kafka не XA.
  • @nestjs/cqrs Sagas (RxJS поверх in-memory EventBus) не решают EC: события живут в одном процессе и не персистентны.

В распределённой системе невозможно одновременно иметь строгую согласованность, доступность и устойчивость к разделению сети — теорема CAP. UCP выбирает доступность и устойчивость, поэтому согласованность у нас eventual. Это решение требует явного оформления, не молчаливого «как-нибудь само догонит».

Декларация в API

R-DIST-EC-1: для endpoint, который читает eventual-consistent данные, в OpenAPI через @ApiOperation указываем ожидаемую задержку.

@ApiOperation({
  summary: 'Получить список заказов клиента',
  description: `
Возвращает summary заказов клиента из денормализованной read-проекции.

**Eventual consistency**: задержка от write в order-service до появления
в этом endpoint обычно < 1 секунды (p99 < 5 секунд). Если клиенту нужна
immediate consistency сразу после POST /orders — использовать GET /orders/:id,
этот endpoint читает write-side.
  `.trim(),
})
@ApiResponse({ status: 200, description: 'Список заказов (могут отставать на 1–5 секунд)' })
@Get(':customerId/orders')
async getOrderSummaries(@Param('customerId') customerId: string) {
  return this.orderProjectionService.findByCustomer(customerId);
}

Без этой декларации разработчик на клиентской стороне напишет тест POST /orders + сразу GET /customers/:id/orders и будет открывать баг-репорты, когда заказ ещё не появился в проекции.

Read-your-writes — четыре способа

R-DIST-EC-2: «клиент после своего write сразу читает свой результат» — отдельная задача, которая не решается сама.

1. Polling в клиенте

Клиент делает POST /orders, получает orderId, потом polling-ом GET до появления в read-проекции:

// client-side: browser / BFF
async function createOrderAndWait(payload: CreateOrderDto): Promise<OrderSummaryDto> {
  const { orderId } = await orderApi.create(payload);

  for (let i = 0; i < 20; i++) {
    const summary = await orderApi.getSummary(orderId);
    if (summary) return summary;
    await sleep(200);
  }
  throw new Error(`OrderSummary not available for ${orderId} after 4s`);
}

Простой, клиент платит latency. Применимо для UI с прогресс-индикатором.

2. Synchronous wait в контроллере

Контроллер после write выполняет короткий polling read-model на стороне сервера:

@Post()
async create(@Body() dto: CreateOrderDto): Promise<OrderSummaryDto> {
  const order = await this.createOrderUseCase.execute(dto);

  const projection = await this.waitForProjection(order.id, 2000);
  return projection ?? OrderSummaryDto.fromAggregate(order);
}

private async waitForProjection(orderId: string, timeoutMs: number): Promise<OrderSummaryDto | null> {
  const deadline = Date.now() + timeoutMs;
  while (Date.now() < deadline) {
    const found = await this.projectionRepo.findByOrderId(orderId);
    if (found) return found;
    await sleep(100);
  }
  return null;
}

Подходит для критичных flow «создать заказ и сразу показать summary».

3. Version-токен клиенту

Контроллер возвращает version из write-side вместе с ответом. Клиент передаёт его в следующий GET, read-model ждёт, пока не догонит:

@Post()
async create(@Body() dto: CreateOrderDto): Promise<{ orderId: string; version: number }> {
  const order = await this.createOrderUseCase.execute(dto);
  return { orderId: order.id, version: order.version };
}

@Get(':orderId')
async getWithVersion(
  @Param('orderId') orderId: string,
  @Query('minVersion') minVersion?: string,
): Promise<OrderSummaryDto> {
  const min = minVersion ? parseInt(minVersion, 10) : 0;

  if (min > 0) {
    const deadline = Date.now() + 3000;
    while (Date.now() < deadline) {
      const proj = await this.projectionRepo.findByOrderId(orderId);
      if (proj && proj.version >= min) return proj;
      await sleep(100);
    }
  }

  return this.projectionRepo.findByOrderIdOrThrow(orderId);
}

4. Альтернативный endpoint из write-side

Два endpoint с явно разной семантикой:

POST /orders                      → CreateOrderUseCase (write-side)
GET /orders/:id                   → OrderRepository (write-side, immediate consistency)
GET /customers/:id/orders         → OrderSummaryRepository (read-projection, eventual consistency)

Это рабочая комбинация: проекция держит нагрузку CQRS; immediate-endpoint обслуживает flow «сразу после write».

Bounded staleness SLO

R-DIST-EC-3: у каждой read-model явный SLO на максимальную задержку. Измеряем метрикой.

import { Counter, Histogram, Registry } from 'prom-client';

@Injectable()
export class OrderProjectionConsumer {
  private readonly stalenessHistogram: Histogram;

  constructor(
    private readonly registry: Registry,
    private readonly projectionRepo: OrderSummaryRepository,
  ) {
    this.stalenessHistogram = new Histogram({
      name: 'read_model_staleness_seconds',
      help: 'Задержка между событием и обновлением read-model',
      labelNames: ['model', 'event_type'],
      buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
      registers: [this.registry],
    });
  }

  @EventPattern('order.events')
  async handleOrderCreated(@Payload() event: OrderCreatedEvent): Promise<void> {
    const staleness = (Date.now() - new Date(event.occurredAt).getTime()) / 1000;
    this.stalenessHistogram.observe({ model: 'order_summary', event_type: event.type }, staleness);

    await this.projectionRepo.upsert(toOrderSummary(event));
  }
}

Алерт в Prometheus:

- alert: ReadModelStalenessHigh
  expr: >
    histogram_quantile(0.99,
      sum by (le, model) (rate(read_model_staleness_seconds_bucket[5m]))
    ) > 5
  for: 5m
  annotations:
    summary: "Read-model {{ $labels.model }} отстаёт более 5 секунд (p99)"

Без SLO read-model может тихо отстать на час, и об этом узнают из жалоб клиентов.

Causal consistency через version

R-DIST-EC-4: когда порядок событий важен, receiver проверяет монотонность version-поля и пропускает out-of-order events.

TypeORM-entity read-model с version:

@Entity('order_summary')
export class OrderSummaryEntity {
  @PrimaryColumn('uuid')
  orderId: string;

  @Column('int')
  version: number;

  @Column('varchar')
  status: string;

  @Column('jsonb', { nullable: true })
  payload: Record<string, unknown>;
}

Consumer с проверкой монотонности:

@EventPattern('order.events')
@UseInterceptors(ProcessedEventInterceptor)
async handleOrderUpdated(@Payload() event: OrderUpdatedEvent): Promise<void> {
  await this.dataSource.transaction(async (m) => {
    const current = await m.findOne(OrderSummaryEntity, { where: { orderId: event.orderId } });

    if (current && event.aggregateVersion <= current.version) {
      return;
    }

    await m.upsert(
      OrderSummaryEntity,
      {
        orderId: event.orderId,
        version: event.aggregateVersion,
        status: event.status,
        payload: event.payload,
      },
      ['orderId'],
    );
  });
}

aggregateVersion — монотонно растущий счётчик, увеличивается на write-side при каждом изменении агрегата. При rebalance Kafka или retry из retry-topic consumer может получить события не по порядку — out-of-order события пропускаются, финальное состояние проекции остаётся корректным.

Vector clocks нужны редко — обычно достаточно скалярного version на агрегат.

Что запрещено

АнтипаттернПравилоЧто взамен
Endpoint возвращает stale data без единого слова в OpenAPIR-DIST-EC-X1@ApiOperation({ description: '...Eventual consistency: задержка до N секунд...' })
2PC для immediate consistency между сервисамиR-DIST-EC-X2redesign boundary, modular monolith или прими EC
Read-model без SLO на задержкуR-DIST-EC-3read_model_staleness_seconds + Prometheus-алерт
Out-of-order events применяются как естьR-DIST-EC-4event.version > current.version до upsert
Polling без timeout и fallbackR-DIST-EC-2deadline + OrderSummaryDto.fromAggregate(order) как fallback
@nestjs/cqrs Sagas (RxJS) для durable-оркестрацииR-DIST-SAGA-X3Temporal (@temporalio/*) или kafkajs-consumer + saga_<name> таблица
Прямой producer.send после commit вместо outboxR-DIST-OBX-X1outbox в той же DataSource.transaction

Куда дальше

  • Saga в NestJS — sagaId сквозной, in-flight саги — отдельный случай EC.
  • Idempotency в NestJS — consumer при retry обязан быть идемпотентным.
  • Outbox + Inbox в NestJS — главный механизм синхронизации в EC.
  • Compensation в NestJS — semantic compensation при частичных сбоях.
  • Distributed transactions в NestJS — почему 2PC/XA не работает в Node-стеке.
  • Когда применять — выбор между EC, modular monolith и redesign boundary.