В CQRS данные хранятся в двух местах: write-store (агрегаты) и read-store (проекции для запросов). Как только что-то меняется на write-стороне, read-сторона должна это узнать. Именно здесь чаще всего возникают проблемы — если сделать синхронизацию неправильно, данные расходятся, и приложение начинает врать пользователям.
В этой статье — надёжный способ синхронизировать read-model через события: outbox-паттерн, Kafka и идемпотентный consumer.
Почему нельзя просто обновить read-model в той же транзакции
Первый порыв — добавить в command-handler строчку, которая обновляет read-model сразу после сохранения агрегата:
@Transactional
public OrderId handle(ConfirmOrderCommand cmd) {
Order order = orderRepository.findById(cmd.orderId());
order.confirm();
orderRepository.save(order);
orderSummaryRepository.updateStatus(order.id(), "CONFIRMED"); // так не надо
return order.id();
}
На первый взгляд удобно. На практике — несколько серьёзных последствий:
- Если read-store лежит в другой базе данных, атомарности нет. Одна операция пройдёт, другая — нет. Данные разойдутся.
- Read-store и write-store становятся жёстко связаны. Изменение структуры read-таблицы затрагивает write-транзакции. Avaria в read-store роняет весь запрос на запись.
- Синхронизация между сервисами через метод не работает вообще.
Решение — разорвать связь через событие.
Outbox-паттерн: событие и агрегат в одной транзакции
Главная идея outbox: вместо того чтобы сразу слать событие в Kafka, записываем его в таблицу outbox — в той же базе данных и той же транзакции, что и изменение агрегата.
Одна транзакция:
1. UPDATE order SET status = 'CONFIRMED' WHERE id = 42
2. INSERT INTO outbox (event_type, payload, aggregate_id)
VALUES ('OrderConfirmed', '{...}', 42)
→ либо обе строки, либо ни одной
После того как транзакция прошла, в дело вступает outbox-relay — отдельный scheduled-бин, который периодически читает непрочитанные строки из outbox и публикует их в Kafka:
@Scheduled(fixedDelay = 200)
@Transactional
public void relay() {
List<OutboxRecord> pending = outboxRepository.findPending(100);
for (OutboxRecord record : pending) {
kafkaTemplate.send(record.topic(), record.key(), record.payload());
outboxRepository.markPublished(record.id());
}
}
Зачем именно так? Без outbox есть два неприятных сценария:
- Транзакция прошла, Kafka недоступна — событие потеряно, read-model рассинхронизирована.
- Kafka получила событие, транзакция откатилась — в read-model появляется изменение, которого нет в write-store.
Outbox решает оба: событие живёт в базе данных до тех пор, пока relay не убедится, что оно дошло до Kafka.
Idempotent consumer: что делать с дублями
Kafka гарантирует доставку как минимум один раз (at-least-once). Это означает: одно и то же событие consumer может получить дважды. Без защиты read-model будет испорчена.
Есть два способа сделать consumer идемпотентным.
Таблица обработанных событий
Отдельная таблица хранит ID уже обработанных событий. Перед обновлением read-model проверяем, не видели ли мы это событие раньше:
CREATE TABLE processed_event (
event_id UUID PRIMARY KEY,
consumer TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
@KafkaListener(topics = "order.events", groupId = "order-summary-projector")
@Transactional
public void onOrderConfirmed(@Payload OrderConfirmed event) {
if (processedEventRepository.exists(event.eventId(), "order-summary-projector")) {
return; // уже обрабатывали
}
processedEventRepository.markProcessed(event.eventId(), "order-summary-projector");
orderSummaryRepository.updateStatus(event.orderId(), "CONFIRMED", event.confirmedAt());
}
Надёжно, но требует дополнительной таблицы с нагрузкой на запись.
Idempotent UPDATE по версии агрегата
Если в каждом событии есть aggregateVersion, UPDATE применяется только тогда, когда событие новее текущего состояния read-model:
@KafkaListener(topics = "order.events", groupId = "order-summary-projector")
@Transactional
public void onOrderConfirmed(@Payload OrderConfirmed event) {
int updated = orderSummaryRepository.updateStatusIfNewer(
event.orderId(),
"CONFIRMED",
event.confirmedAt(),
event.aggregateVersion()
);
if (updated == 0) {
log.debug("пропускаем устаревшее или дублирующее событие: {}", event.eventId());
}
}
UPDATE order_summary
SET status = 'CONFIRMED', confirmed_at = ?, version = ?, updated_at = NOW()
WHERE order_id = ? AND version < ?
Не нужна отдельная таблица, но требует, чтобы события одного агрегата приходили в порядке (одна партиция Kafka по aggregate_id).
Bootstrap: что делать, когда read-model пустая
Представьте: вы добавили новую проекцию — например, начали хранить сводку по заказам в Elasticsearch. Или read-store был потерян и его нужно восстановить. Ждать, пока через Kafka придут события за последние 30 дней — неправильно.
Решение — пакетное восстановление: при старте приложения проверяем, пуста ли read-model, и если да — заполняем её из write-store напрямую:
@Component
@RequiredArgsConstructor
public class OrderSummaryBootstrap implements ApplicationRunner {
private final OrderSummaryRepository orderSummaryRepository;
private final OrderRepository orderRepository;
@Override
public void run(ApplicationArguments args) {
if (!orderSummaryRepository.isEmpty()) {
return; // уже заполнено, пропускаем
}
log.info("read-model order_summary пуста — запускаем rebuild");
rebuildAll();
}
void rebuildAll() {
long lastId = 0;
while (true) {
List<Order> batch = orderRepository.findAllAfter(lastId, 1000);
if (batch.isEmpty()) break;
orderSummaryRepository.upsertBatch(batch.stream().map(this::toSummary).toList());
lastId = batch.getLast().id().value();
}
}
}
Rebuild нужен в трёх случаях:
- Новая read-model (только что создана, данных нет).
- Аварийное восстановление (read-store потерян или повреждён).
- Структурная миграция (добавили колонку, нужно дозаполнить существующие записи).
Eventual consistency: скажи об этом явно
Когда read-model обновляется асинхронно, между записью и появлением изменения в проекции проходит время — обычно меньше секунды, но это не гарантия. Клиент должен знать об этом заранее — иначе баги, которые невозможно воспроизвести.
Правило простое: если endpoint обслуживает read-model, укажите это в документации API:
paths:
/orders/{id}/summary:
get:
summary: Сводка заказа (read-проекция)
description: |
Возвращает read-проекцию заказа.
Возможна задержка до 1 секунды между изменением заказа и обновлением
этой проекции.
Для немедленной согласованности (например, сразу после POST /orders)
используйте GET /orders/{id}.
Клиент понимает: сразу после создания заказа запрос к /summary может вернуть старые данные или 404. Это не баг — это архитектурное свойство.
Read-your-writes: когда нужно гарантировать свежесть
Иногда пользователь должен сразу увидеть результат своего действия. Например: создал заказ → открыл список заказов → увидел его. Три способа справиться с этим, от простого к сложному.
Два endpoint'а с разными гарантиями
Самый прямолинейный подход: один endpoint читает из write-store (мгновенно согласованный), другой — из read-model (eventual consistency):
/orders/{id}:
get:
summary: Заказ (из write-store, мгновенная согласованность)
/orders/{id}/summary:
get:
summary: Сводка заказа (из read-проекции, eventual consistency)
Клиент сам выбирает, что ему нужно. Никакой магии, всё явно.
Sticky session через gateway
Запросы одного клиента направляются на один и тот же pod. Если read-model обновляется локально в памяти — клиент сразу видит свежие данные. Работает только в рамках одного pod, хрупко при перезапусках.
Polling после записи
Command-handler после коммита ждёт, пока изменение появится в read-model:
@Transactional
public OrderId handle(CreateOrderCommand cmd) {
Order order = orderFactory.createFor(cmd.customerId(), cmd.items());
orderRepository.save(order);
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override public void afterCommit() {
pollUntilVisible(order.id(), Duration.ofSeconds(2));
}
}
);
return order.id();
}
Минус: POST-запрос становится медленным (до 2 секунд в худшем случае). Подходит только для редких критичных операций.
Частые ошибки
Обновление read-model в command-handler. Когда write-side и read-side связаны напрямую через метод, отказоустойчивость теряется, а распределённый сценарий (разные сервисы) работать не будет. Правильно — через outbox и consumer.
PG-триггеры для синхронизации. Соблазнительно: поставил триггер на write-таблицу, и он сам обновляет read-таблицу. Но триггер — невидимая магия: программист читает Java-код и не знает, что UPDATE на одну таблицу пишет в другую. На массовых операциях триггер работает на каждую строку — медленно. Не работает между разными базами и сервисами.
Event как слепок write-таблицы. Если событие — это просто POJO, сгенерированный из структуры write-таблицы, любое изменение схемы сломает consumer'ов. Событие должно быть независимым record'ом:
// Зависимый от схемы — опасно
public record OrderConfirmed(OrderRecord row) {}
// Независимый — правильно
public record OrderConfirmed(
UUID eventId,
Long orderId,
OrderStatus status,
Instant confirmedAt,
long aggregateVersion
) {}
Независимый record можно версионировать (OrderConfirmedV2), можно менять write-схему без поломки consumer'ов.
Consumer без защиты от дублей. Kafka доставляет at-least-once. Без idempotency read-model рано или поздно испортится.
Коротко
- Синхронизация write → read идёт через outbox + Kafka, не напрямую. Outbox записывается в той же транзакции, что и агрегат — гарантия атомарности.
- Outbox-relay читает непубличные записи из outbox и публикует в Kafka. Пока не опубликовано — будет ретраить.
- Idempotent consumer обязателен: Kafka может доставить событие дважды. Защита — таблица
processed_eventили UPDATE по версии агрегата. - Bootstrap-rebuild нужен при создании новой read-model или после аварии: проход по write-store пачками, а не ожидание старых событий из Kafka.
- Eventual consistency декларируется в API: клиент должен знать, что после записи проекция обновится с задержкой.
- Read-your-writes: два endpoint'а с разными гарантиями — самое простое решение. Polling и sticky session — для особых случаев.
- Event — независимый record, не слепок write-таблицы. Иначе любая миграция схемы ломает consumer'ов.
Что почитать дальше
- CQRS: разделение записи и чтения — основы паттерна.
- Read-model в CQRS: где и как хранить проекцию — варианты хранилищ и структура проекций.
- Command side в CQRS — как outbox-событие регистрируется в write-handler.