Опирается на правила: R-CQRS-SYNC-1R-CQRS-SYNC-5 и R-CQRS-SYNC-X1R-CQRS-SYNC-X3 из CQRS Style Guide → раздел 5. Синхронизация через события.

Важно знать

  • Sync write → read идёт через outbox + Kafka, не synchronously. Outbox-событие летит из той же транзакции, что и изменение агрегата.
  • Idempotent consumer обязателен: read-model UPDATE может прийти дважды. Защита — processed_event таблица или idempotent UPDATE по version.
  • Bootstrap-rebuild при первом запуске или потере read-model: batch проход по агрегатам, не ждём пока придут events за 30 дней.
  • Eventual consistency декларируется в API. OpenAPI явно пишет «возможна задержка до 1 секунды».
  • Read-your-writes при необходимости — через sticky session, polling read-model в command-handler-е или альтернативный endpoint без CQRS-split.
  • Никакого synchronous UPDATE read-model в command-handler. Decoupling сразу теряется.
  • Никаких PG-триггеров для sync — невидимая магия, ломается на bulk-операциях, не работает cross-DB.
  • Никаких schema-coupled events: event-payload — независимый record, не generated POJO write-схемы.

Сердце CQRS — это не разделение моделей, а способ их связать. Если write и read физически разнесены, нужен надёжный механизм передачи изменений, и он же определяет всё: latency, отказоустойчивость, формат данных. Outbox + Kafka — стандартная связка; всё остальное в этой статье — детали её правильного применения. Раскрытие раздела 5 гайда.

Outbox-pattern: атомарность с агрегатом

R-CQRS-SYNC-1: write-handler регистрирует событие через registerEvent агрегата; на repository.save(order) событие записывается в outbox-таблицу той же БД, в той же транзакции, что и изменение order.

COMMIT того, что изменилось:
  1. UPDATE order SET status = 'CONFIRMED' WHERE id = 42
  2. INSERT INTO outbox (event_type, payload, aggregate_id) 
       VALUES ('OrderConfirmed', '{...}', 42)
  3. (всё атомарно, либо обе строки, либо ни одной)

После commit:

  • Outbox-relay — отдельный scheduled bean, который раз в N миллисекунд делает SELECT … FOR UPDATE SKIP LOCKED LIMIT 100 из outbox, публикует в Kafka, помечает как опубликованное.
  • Producer idempotent. Kafka обеспечивает at-least-once с дедупом на уровне partition (см. Kafka → producer).

Зачем outbox, почему нельзя «после commit пушнуть в Kafka»:

  • Commit прошёл, Kafka недоступна — событие потеряно, read-model рассинхронизируется.
  • Commit failed, Kafka уже получила — phantom event, в БД нет соответствующего state.
  • Outbox решает обе проблемы: пока строка лежит в outbox, до Kafka она дойдёт; пока не дошла — relay будет ретраить.

Подробно — в Kafka → Outbox publishing.

Idempotent consumer

R-CQRS-SYNC-2: consumer read-стороны обязан быть идемпотентным. At-least-once в Kafka означает: одно сообщение может прийти дважды. Без защиты read-model становится несогласованной.

Два варианта защиты:

processed_event таблица

CREATE TABLE processed_event (
    event_id     UUID PRIMARY KEY,
    consumer     TEXT NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
@KafkaListener(topics = "order.events", groupId = "order-summary-projector")
@Transactional
public void onOrderConfirmed(@Payload OrderConfirmed event) {
    if (processedEventRepository.exists(event.eventId(), "order-summary-projector")) {
        return; // дубликат
    }
    processedEventRepository.markProcessed(event.eventId(), "order-summary-projector");
    orderSummaryRepository.updateStatus(event.orderId(), "CONFIRMED", event.confirmedAt());
}

Плюс: гарантия точного дедупа. Минус: ещё одна таблица под нагрузкой записи.

Idempotent UPDATE по version

В read-таблице есть колонка version (или last_event_seq). UPDATE применяется только если событие новее:

@KafkaListener(topics = "order.events", groupId = "order-summary-projector")
@Transactional
public void onOrderConfirmed(@Payload OrderConfirmed event) {
    int updated = orderSummaryRepository.updateStatusIfNewer(
        event.orderId(),
        "CONFIRMED",
        event.confirmedAt(),
        event.aggregateVersion()
    );
    if (updated == 0) {
        log.debug("skip stale or duplicate event: {}", event.eventId());
    }
}
UPDATE order_summary
SET status = 'CONFIRMED', confirmed_at = ?, version = ?, updated_at = NOW()
WHERE order_id = ? AND version < ?

Плюс: не нужна отдельная таблица. Минус: требует aggregateVersion в каждом событии. Подходит когда события одного агрегата идут строго по порядку (одна партиция Kafka по aggregate_id).

Bootstrap и disaster recovery — synchronous batch

R-CQRS-SYNC-3: при первом запуске нового read-store или после потери — не ждём пока события придут из Kafka. Запускаем batch-rebuild.

@Component
@RequiredArgsConstructor
public class OrderSummaryBootstrap implements ApplicationRunner {

    private final OrderSummaryRepository orderSummaryRepository;
    private final OrderRepository orderRepository;

    @Override
    public void run(ApplicationArguments args) {
        if (!orderSummaryRepository.isEmpty()) {
            return; // уже заполнено
        }
        log.info("read-model order_summary is empty — running bootstrap rebuild");
        rebuildAll();
    }

    void rebuildAll() {
        long lastId = 0;
        while (true) {
            List<Order> batch = orderRepository.findAllAfter(lastId, 1000);
            if (batch.isEmpty()) break;
            orderSummaryRepository.upsertBatch(batch.stream().map(this::toSummary).toList());
            lastId = batch.getLast().id().value();
        }
    }
}

Не путать с обычной работой: bootstrap — разовый при создании read-store, не каждый раз при старте. Триггеры:

  • Новый read-store (например, добавили ElasticSearch — он пустой).
  • Disaster recovery (read-store потерян, восстанавливаем из write).
  • Структурная миграция (добавили колонку, нужно дозаполнить).

Также этот же скрипт пригодится для ручного rebuild по одной записи, если consumer пропустил событие из-за бага.

Eventual consistency декларируется в API

R-CQRS-SYNC-4: если endpoint обслуживает read-model — это должно быть видно в OpenAPI. Клиент не должен догадываться.

paths:
  /orders/{id}/summary:
    get:
      summary: Get order summary (read-projection)
      description: |
        Возвращает read-проекцию заказа.
        Возможна задержка до 1 секунды между write-операцией и появлением 
        обновления в этой проекции.
        Для immediate consistency (например, сразу после POST /orders) 
        используйте GET /orders/{id} (полный агрегат через write-store).
      responses:
        '200':
          description: ...

Зачем:

  • Клиент знает, что после POST /orders сразу делать GET /orders/{id}/summary — может вернуть 404 или старое состояние.
  • Если клиенту нужна immediate consistency, он либо использует другой endpoint, либо применяет read-your-writes (см. ниже).
  • В тестах и production troubleshooting eventual consistency — это не баг, а архитектурное свойство, и оно задокументировано.

Read-your-writes — три механизма

R-CQRS-SYNC-5: иногда нужно гарантировать, что клиент после своего write увидит этот write в read-проекции. Три варианта, в порядке возрастания инвазивности:

1. Sticky session в gateway

Запросы одного клиента приходят на тот же pod, который только что обработал его write. Если read-model — это локальный bean в этом же сервисе (например, in-memory cache, который consumer обновил синхронно после commit), клиент увидит свежие данные.

Не работает кросс-pod, кросс-сервис. Хрупко.

2. Polling read-model в command-handler-е

@Override
@Transactional
public OrderId handle(CreateOrderCommand cmd) {
    Order order = orderFactory.createFor(cmd.customerId(), cmd.items());
    orderRepository.save(order);
    // post-commit hook: ждём появления в read-model до 2 секунд
    TransactionSynchronizationManager.registerSynchronization(
        new TransactionSynchronization() {
            @Override public void afterCommit() {
                pollUntilVisible(order.id(), Duration.ofSeconds(2));
            }
        }
    );
    return order.id();
}

Контракт: 99% случаев укладывается в 2 секунды, в остальных — 504 / async-pattern.

Минус: реальный latency POST становится p99 = 2s. Не подходит для частых операций.

3. Альтернативный endpoint без CQRS-split

Самое простое: для критичных к immediate consistency сценариев — отдельный endpoint, который читает прямо из write-store.

paths:
  /orders/{id}:
    get:
      summary: Get order (immediate consistency, from write-store)

  /orders/{id}/summary:
    get:
      summary: Get order summary (eventual consistency, from read-projection)

В большинстве случаев это правильное решение — два endpoint'а явно показывают trade-off клиенту.

Что запрещено

Synchronous UPDATE read-model в command-handler

R-CQRS-SYNC-X1: в command-handler-е нет orderSummaryRepository.update(...) после save.

// ПЛОХО
@Transactional
public OrderId handle(ConfirmOrderCommand cmd) {
    Order order = orderRepository.findById(..., FOR_UPDATE).orElseThrow(...);
    order.confirm();
    orderRepository.save(order);
    orderSummaryRepository.updateStatus(order.id(), "CONFIRMED");  // ← синхронный sync
    return order.id();
}

Что не так:

  • Decoupling утрачен. ALTER TABLE order_summary теперь блокирует write-транзакции. Любая авария read-store роняет write.
  • Atomic guarantee ложная. Если order_summary в другой БД, надо 2PC (запрещён). Если в той же — но через другой connection / pool — гарантия только в рамках одной транзакции.
  • Без outbox sync не работает между сервисами. Если read-store обслуживается другим сервисом, sync через метод нерелевантен.

Корректно: outbox в той же транзакции, что и order. Relay публикует в Kafka. Consumer обновляет order_summary.

Sync через PG triggers

R-CQRS-SYNC-X2: CREATE TRIGGER на write-таблице, который пишет в read-таблицу при UPDATE.

-- ПЛОХО
CREATE OR REPLACE FUNCTION sync_order_summary() RETURNS TRIGGER AS $$
BEGIN
    UPDATE order_summary SET status = NEW.status WHERE order_id = NEW.id;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER order_summary_sync AFTER UPDATE ON "order"
    FOR EACH ROW EXECUTE FUNCTION sync_order_summary();

Что не так:

  • Невидимая магия. Программисты, читающие Java-код, не подозревают, что UPDATE order обновляет ещё и order_summary.
  • Не работает на bulk. UPDATE order SET status = ... WHERE created_at < ? запустит триггер на каждую строку — медленно.
  • Не масштабируется на cross-DB / cross-service.
  • Тестировать сложно. Юнит-тесты не видят триггер, integration-тесты должны его подгрузить.

Корректно: явный consumer на событие в Java-коде. Tracebale, тестируемо, масштабируемо.

Schema-coupled events

R-CQRS-SYNC-X3: event-payload не должен быть generated POJO write-схемы. Любой ALTER TABLE на write-side ломает consumers.

// ПЛОХО — event payload содержит generated row write-таблицы
public record OrderConfirmed(OrderRecord row) {}
// где OrderRecord — jooq-generated класс, отражающий точную структуру таблицы

// ХОРОШО — независимый event-record, версионируется отдельно
public record OrderConfirmed(
    UUID eventId,
    Long orderId,
    OrderStatus status,
    Instant confirmedAt,
    long aggregateVersion
) {}

Зачем независимый record:

  • Можно ALTER write-таблицу (добавить колонку, переименовать существующую) без поломки consumers.
  • Можно версионировать event (OrderConfirmedV1OrderConfirmedV2) explicitly.
  • Можно сериализовать события одинаково для in-process и cross-service consumers.

Подробно — в Kafka → Event versioning.

Что запрещено — таблица

АнтипаттернПравилоЧто взамен
Synchronous UPDATE read-model в command-handlerR-CQRS-SYNC-X1Outbox + Kafka + consumer
PG-trigger sync write-таблицы → read-таблицыR-CQRS-SYNC-X2Явный consumer в Java
Event-payload — generated POJO write-схемыR-CQRS-SYNC-X3Независимый record с версионированием
Consumer без idempotency-защитыR-CQRS-SYNC-2processed_event или idempotent UPDATE по version
Eventual consistency не декларирована в APIR-CQRS-SYNC-4Явный description в OpenAPI
Ожидание заполнения read-model от Kafka на bootstrapR-CQRS-SYNC-3Batch-rebuild из write-store

Куда дальше