Опирается на правила:
R-CQRS-SYNC-1…R-CQRS-SYNC-5иR-CQRS-SYNC-X1…R-CQRS-SYNC-X3из CQRS Style Guide → раздел 5. Синхронизация через события.
Важно знать
- Sync write → read идёт через outbox + Kafka, не synchronously. Outbox-событие летит из той же транзакции, что и изменение агрегата.
- Idempotent consumer обязателен: read-model UPDATE может прийти дважды. Защита —
processed_eventтаблица или idempotent UPDATE поversion.- Bootstrap-rebuild при первом запуске или потере read-model: batch проход по агрегатам, не ждём пока придут events за 30 дней.
- Eventual consistency декларируется в API. OpenAPI явно пишет «возможна задержка до 1 секунды».
- Read-your-writes при необходимости — через sticky session, polling read-model в command-handler-е или альтернативный endpoint без CQRS-split.
- Никакого synchronous UPDATE read-model в command-handler. Decoupling сразу теряется.
- Никаких PG-триггеров для sync — невидимая магия, ломается на bulk-операциях, не работает cross-DB.
- Никаких schema-coupled events: event-payload — независимый record, не generated POJO write-схемы.
Сердце CQRS — это не разделение моделей, а способ их связать. Если write и read физически разнесены, нужен надёжный механизм передачи изменений, и он же определяет всё: latency, отказоустойчивость, формат данных. Outbox + Kafka — стандартная связка; всё остальное в этой статье — детали её правильного применения. Раскрытие раздела 5 гайда.
Outbox-pattern: атомарность с агрегатом
R-CQRS-SYNC-1: write-handler регистрирует событие через registerEvent агрегата; на repository.save(order) событие записывается в outbox-таблицу той же БД, в той же транзакции, что и изменение order.
COMMIT того, что изменилось:
1. UPDATE order SET status = 'CONFIRMED' WHERE id = 42
2. INSERT INTO outbox (event_type, payload, aggregate_id)
VALUES ('OrderConfirmed', '{...}', 42)
3. (всё атомарно, либо обе строки, либо ни одной)
После commit:
- Outbox-relay — отдельный scheduled bean, который раз в N миллисекунд делает
SELECT … FOR UPDATE SKIP LOCKED LIMIT 100изoutbox, публикует в Kafka, помечает как опубликованное. - Producer idempotent. Kafka обеспечивает at-least-once с дедупом на уровне partition (см. Kafka → producer).
Зачем outbox, почему нельзя «после commit пушнуть в Kafka»:
- Commit прошёл, Kafka недоступна — событие потеряно, read-model рассинхронизируется.
- Commit failed, Kafka уже получила — phantom event, в БД нет соответствующего state.
- Outbox решает обе проблемы: пока строка лежит в outbox, до Kafka она дойдёт; пока не дошла — relay будет ретраить.
Подробно — в Kafka → Outbox publishing.
Idempotent consumer
R-CQRS-SYNC-2: consumer read-стороны обязан быть идемпотентным. At-least-once в Kafka означает: одно сообщение может прийти дважды. Без защиты read-model становится несогласованной.
Два варианта защиты:
processed_event таблица
CREATE TABLE processed_event (
event_id UUID PRIMARY KEY,
consumer TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
@KafkaListener(topics = "order.events", groupId = "order-summary-projector")
@Transactional
public void onOrderConfirmed(@Payload OrderConfirmed event) {
if (processedEventRepository.exists(event.eventId(), "order-summary-projector")) {
return; // дубликат
}
processedEventRepository.markProcessed(event.eventId(), "order-summary-projector");
orderSummaryRepository.updateStatus(event.orderId(), "CONFIRMED", event.confirmedAt());
}
Плюс: гарантия точного дедупа. Минус: ещё одна таблица под нагрузкой записи.
Idempotent UPDATE по version
В read-таблице есть колонка version (или last_event_seq). UPDATE применяется только если событие новее:
@KafkaListener(topics = "order.events", groupId = "order-summary-projector")
@Transactional
public void onOrderConfirmed(@Payload OrderConfirmed event) {
int updated = orderSummaryRepository.updateStatusIfNewer(
event.orderId(),
"CONFIRMED",
event.confirmedAt(),
event.aggregateVersion()
);
if (updated == 0) {
log.debug("skip stale or duplicate event: {}", event.eventId());
}
}
UPDATE order_summary
SET status = 'CONFIRMED', confirmed_at = ?, version = ?, updated_at = NOW()
WHERE order_id = ? AND version < ?
Плюс: не нужна отдельная таблица. Минус: требует aggregateVersion в каждом событии. Подходит когда события одного агрегата идут строго по порядку (одна партиция Kafka по aggregate_id).
Bootstrap и disaster recovery — synchronous batch
R-CQRS-SYNC-3: при первом запуске нового read-store или после потери — не ждём пока события придут из Kafka. Запускаем batch-rebuild.
@Component
@RequiredArgsConstructor
public class OrderSummaryBootstrap implements ApplicationRunner {
private final OrderSummaryRepository orderSummaryRepository;
private final OrderRepository orderRepository;
@Override
public void run(ApplicationArguments args) {
if (!orderSummaryRepository.isEmpty()) {
return; // уже заполнено
}
log.info("read-model order_summary is empty — running bootstrap rebuild");
rebuildAll();
}
void rebuildAll() {
long lastId = 0;
while (true) {
List<Order> batch = orderRepository.findAllAfter(lastId, 1000);
if (batch.isEmpty()) break;
orderSummaryRepository.upsertBatch(batch.stream().map(this::toSummary).toList());
lastId = batch.getLast().id().value();
}
}
}
Не путать с обычной работой: bootstrap — разовый при создании read-store, не каждый раз при старте. Триггеры:
- Новый read-store (например, добавили ElasticSearch — он пустой).
- Disaster recovery (read-store потерян, восстанавливаем из write).
- Структурная миграция (добавили колонку, нужно дозаполнить).
Также этот же скрипт пригодится для ручного rebuild по одной записи, если consumer пропустил событие из-за бага.
Eventual consistency декларируется в API
R-CQRS-SYNC-4: если endpoint обслуживает read-model — это должно быть видно в OpenAPI. Клиент не должен догадываться.
paths:
/orders/{id}/summary:
get:
summary: Get order summary (read-projection)
description: |
Возвращает read-проекцию заказа.
Возможна задержка до 1 секунды между write-операцией и появлением
обновления в этой проекции.
Для immediate consistency (например, сразу после POST /orders)
используйте GET /orders/{id} (полный агрегат через write-store).
responses:
'200':
description: ...
Зачем:
- Клиент знает, что после
POST /ordersсразу делатьGET /orders/{id}/summary— может вернуть404или старое состояние. - Если клиенту нужна immediate consistency, он либо использует другой endpoint, либо применяет read-your-writes (см. ниже).
- В тестах и production troubleshooting eventual consistency — это не баг, а архитектурное свойство, и оно задокументировано.
Read-your-writes — три механизма
R-CQRS-SYNC-5: иногда нужно гарантировать, что клиент после своего write увидит этот write в read-проекции. Три варианта, в порядке возрастания инвазивности:
1. Sticky session в gateway
Запросы одного клиента приходят на тот же pod, который только что обработал его write. Если read-model — это локальный bean в этом же сервисе (например, in-memory cache, который consumer обновил синхронно после commit), клиент увидит свежие данные.
Не работает кросс-pod, кросс-сервис. Хрупко.
2. Polling read-model в command-handler-е
@Override
@Transactional
public OrderId handle(CreateOrderCommand cmd) {
Order order = orderFactory.createFor(cmd.customerId(), cmd.items());
orderRepository.save(order);
// post-commit hook: ждём появления в read-model до 2 секунд
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override public void afterCommit() {
pollUntilVisible(order.id(), Duration.ofSeconds(2));
}
}
);
return order.id();
}
Контракт: 99% случаев укладывается в 2 секунды, в остальных — 504 / async-pattern.
Минус: реальный latency POST становится p99 = 2s. Не подходит для частых операций.
3. Альтернативный endpoint без CQRS-split
Самое простое: для критичных к immediate consistency сценариев — отдельный endpoint, который читает прямо из write-store.
paths:
/orders/{id}:
get:
summary: Get order (immediate consistency, from write-store)
/orders/{id}/summary:
get:
summary: Get order summary (eventual consistency, from read-projection)
В большинстве случаев это правильное решение — два endpoint'а явно показывают trade-off клиенту.
Что запрещено
Synchronous UPDATE read-model в command-handler
R-CQRS-SYNC-X1: в command-handler-е нет orderSummaryRepository.update(...) после save.
// ПЛОХО
@Transactional
public OrderId handle(ConfirmOrderCommand cmd) {
Order order = orderRepository.findById(..., FOR_UPDATE).orElseThrow(...);
order.confirm();
orderRepository.save(order);
orderSummaryRepository.updateStatus(order.id(), "CONFIRMED"); // ← синхронный sync
return order.id();
}
Что не так:
- Decoupling утрачен. ALTER TABLE
order_summaryтеперь блокирует write-транзакции. Любая авария read-store роняет write. - Atomic guarantee ложная. Если
order_summaryв другой БД, надо 2PC (запрещён). Если в той же — но через другой connection / pool — гарантия только в рамках одной транзакции. - Без outbox sync не работает между сервисами. Если read-store обслуживается другим сервисом, sync через метод нерелевантен.
Корректно: outbox в той же транзакции, что и order. Relay публикует в Kafka. Consumer обновляет order_summary.
Sync через PG triggers
R-CQRS-SYNC-X2: CREATE TRIGGER на write-таблице, который пишет в read-таблицу при UPDATE.
-- ПЛОХО
CREATE OR REPLACE FUNCTION sync_order_summary() RETURNS TRIGGER AS $$
BEGIN
UPDATE order_summary SET status = NEW.status WHERE order_id = NEW.id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER order_summary_sync AFTER UPDATE ON "order"
FOR EACH ROW EXECUTE FUNCTION sync_order_summary();
Что не так:
- Невидимая магия. Программисты, читающие Java-код, не подозревают, что UPDATE
orderобновляет ещё иorder_summary. - Не работает на bulk.
UPDATE order SET status = ... WHERE created_at < ?запустит триггер на каждую строку — медленно. - Не масштабируется на cross-DB / cross-service.
- Тестировать сложно. Юнит-тесты не видят триггер, integration-тесты должны его подгрузить.
Корректно: явный consumer на событие в Java-коде. Tracebale, тестируемо, масштабируемо.
Schema-coupled events
R-CQRS-SYNC-X3: event-payload не должен быть generated POJO write-схемы. Любой ALTER TABLE на write-side ломает consumers.
// ПЛОХО — event payload содержит generated row write-таблицы
public record OrderConfirmed(OrderRecord row) {}
// где OrderRecord — jooq-generated класс, отражающий точную структуру таблицы
// ХОРОШО — независимый event-record, версионируется отдельно
public record OrderConfirmed(
UUID eventId,
Long orderId,
OrderStatus status,
Instant confirmedAt,
long aggregateVersion
) {}
Зачем независимый record:
- Можно ALTER write-таблицу (добавить колонку, переименовать существующую) без поломки consumers.
- Можно версионировать event (
OrderConfirmedV1→OrderConfirmedV2) explicitly. - Можно сериализовать события одинаково для in-process и cross-service consumers.
Подробно — в Kafka → Event versioning.
Что запрещено — таблица
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Synchronous UPDATE read-model в command-handler | R-CQRS-SYNC-X1 | Outbox + Kafka + consumer |
| PG-trigger sync write-таблицы → read-таблицы | R-CQRS-SYNC-X2 | Явный consumer в Java |
| Event-payload — generated POJO write-схемы | R-CQRS-SYNC-X3 | Независимый record с версионированием |
| Consumer без idempotency-защиты | R-CQRS-SYNC-2 | processed_event или idempotent UPDATE по version |
| Eventual consistency не декларирована в API | R-CQRS-SYNC-4 | Явный description в OpenAPI |
| Ожидание заполнения read-model от Kafka на bootstrap | R-CQRS-SYNC-3 | Batch-rebuild из write-store |
Куда дальше
- CQRS → раздел 5. Синхронизация через события — нормативные
R-CQRS-SYNC-*. - Read-model — где и в каком виде хранится проекция.
- Command side — как outbox-событие регистрируется в write-handler.
- Kafka Style Guide — outbox publishing, idempotent consumer, retry topic.
- Distributed Patterns Style Guide — почему 2PC запрещён и нужна eventual consistency.
- REST API Style Guide — как декларировать eventual consistency в OpenAPI.
- PG Runtime Style Guide — outbox-таблица, FOR UPDATE SKIP LOCKED.