← назад к разделу

В CQRS данные хранятся в двух местах: write-store (агрегаты) и read-store (проекции для запросов). Как только что-то меняется на write-стороне, read-сторона должна это узнать. Именно здесь чаще всего возникают проблемы — если сделать синхронизацию неправильно, данные расходятся, и приложение начинает врать пользователям.

В этой статье — надёжный способ синхронизировать read-model через события: outbox-паттерн, Kafka и идемпотентный consumer.

Почему нельзя просто обновить read-model в той же транзакции

Первый порыв — добавить в command-handler строчку, которая обновляет read-model сразу после сохранения агрегата:

@Transactional
public OrderId handle(ConfirmOrderCommand cmd) {
    Order order = orderRepository.findById(cmd.orderId());
    order.confirm();
    orderRepository.save(order);
    orderSummaryRepository.updateStatus(order.id(), "CONFIRMED"); // так не надо
    return order.id();
}

На первый взгляд удобно. На практике — несколько серьёзных последствий:

  • Если read-store лежит в другой базе данных, атомарности нет. Одна операция пройдёт, другая — нет. Данные разойдутся.
  • Read-store и write-store становятся жёстко связаны. Изменение структуры read-таблицы затрагивает write-транзакции. Avaria в read-store роняет весь запрос на запись.
  • Синхронизация между сервисами через метод не работает вообще.

Решение — разорвать связь через событие.

Outbox-паттерн: событие и агрегат в одной транзакции

Главная идея outbox: вместо того чтобы сразу слать событие в Kafka, записываем его в таблицу outboxв той же базе данных и той же транзакции, что и изменение агрегата.

Одна транзакция:
  1. UPDATE order SET status = 'CONFIRMED' WHERE id = 42
  2. INSERT INTO outbox (event_type, payload, aggregate_id)
       VALUES ('OrderConfirmed', '{...}', 42)
  → либо обе строки, либо ни одной

После того как транзакция прошла, в дело вступает outbox-relay — отдельный scheduled-бин, который периодически читает непрочитанные строки из outbox и публикует их в Kafka:

@Scheduled(fixedDelay = 200)
@Transactional
public void relay() {
    List<OutboxRecord> pending = outboxRepository.findPending(100);
    for (OutboxRecord record : pending) {
        kafkaTemplate.send(record.topic(), record.key(), record.payload());
        outboxRepository.markPublished(record.id());
    }
}

Зачем именно так? Без outbox есть два неприятных сценария:

  • Транзакция прошла, Kafka недоступна — событие потеряно, read-model рассинхронизирована.
  • Kafka получила событие, транзакция откатилась — в read-model появляется изменение, которого нет в write-store.

Outbox решает оба: событие живёт в базе данных до тех пор, пока relay не убедится, что оно дошло до Kafka.

Idempotent consumer: что делать с дублями

Kafka гарантирует доставку как минимум один раз (at-least-once). Это означает: одно и то же событие consumer может получить дважды. Без защиты read-model будет испорчена.

Есть два способа сделать consumer идемпотентным.

Таблица обработанных событий

Отдельная таблица хранит ID уже обработанных событий. Перед обновлением read-model проверяем, не видели ли мы это событие раньше:

CREATE TABLE processed_event (
    event_id     UUID PRIMARY KEY,
    consumer     TEXT NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
@KafkaListener(topics = "order.events", groupId = "order-summary-projector")
@Transactional
public void onOrderConfirmed(@Payload OrderConfirmed event) {
    if (processedEventRepository.exists(event.eventId(), "order-summary-projector")) {
        return; // уже обрабатывали
    }
    processedEventRepository.markProcessed(event.eventId(), "order-summary-projector");
    orderSummaryRepository.updateStatus(event.orderId(), "CONFIRMED", event.confirmedAt());
}

Надёжно, но требует дополнительной таблицы с нагрузкой на запись.

Idempotent UPDATE по версии агрегата

Если в каждом событии есть aggregateVersion, UPDATE применяется только тогда, когда событие новее текущего состояния read-model:

@KafkaListener(topics = "order.events", groupId = "order-summary-projector")
@Transactional
public void onOrderConfirmed(@Payload OrderConfirmed event) {
    int updated = orderSummaryRepository.updateStatusIfNewer(
        event.orderId(),
        "CONFIRMED",
        event.confirmedAt(),
        event.aggregateVersion()
    );
    if (updated == 0) {
        log.debug("пропускаем устаревшее или дублирующее событие: {}", event.eventId());
    }
}
UPDATE order_summary
SET status = 'CONFIRMED', confirmed_at = ?, version = ?, updated_at = NOW()
WHERE order_id = ? AND version < ?

Не нужна отдельная таблица, но требует, чтобы события одного агрегата приходили в порядке (одна партиция Kafka по aggregate_id).

Bootstrap: что делать, когда read-model пустая

Представьте: вы добавили новую проекцию — например, начали хранить сводку по заказам в Elasticsearch. Или read-store был потерян и его нужно восстановить. Ждать, пока через Kafka придут события за последние 30 дней — неправильно.

Решение — пакетное восстановление: при старте приложения проверяем, пуста ли read-model, и если да — заполняем её из write-store напрямую:

@Component
@RequiredArgsConstructor
public class OrderSummaryBootstrap implements ApplicationRunner {

    private final OrderSummaryRepository orderSummaryRepository;
    private final OrderRepository orderRepository;

    @Override
    public void run(ApplicationArguments args) {
        if (!orderSummaryRepository.isEmpty()) {
            return; // уже заполнено, пропускаем
        }
        log.info("read-model order_summary пуста — запускаем rebuild");
        rebuildAll();
    }

    void rebuildAll() {
        long lastId = 0;
        while (true) {
            List<Order> batch = orderRepository.findAllAfter(lastId, 1000);
            if (batch.isEmpty()) break;
            orderSummaryRepository.upsertBatch(batch.stream().map(this::toSummary).toList());
            lastId = batch.getLast().id().value();
        }
    }
}

Rebuild нужен в трёх случаях:

  • Новая read-model (только что создана, данных нет).
  • Аварийное восстановление (read-store потерян или повреждён).
  • Структурная миграция (добавили колонку, нужно дозаполнить существующие записи).

Eventual consistency: скажи об этом явно

Когда read-model обновляется асинхронно, между записью и появлением изменения в проекции проходит время — обычно меньше секунды, но это не гарантия. Клиент должен знать об этом заранее — иначе баги, которые невозможно воспроизвести.

Правило простое: если endpoint обслуживает read-model, укажите это в документации API:

paths:
  /orders/{id}/summary:
    get:
      summary: Сводка заказа (read-проекция)
      description: |
        Возвращает read-проекцию заказа.
        Возможна задержка до 1 секунды между изменением заказа и обновлением 
        этой проекции.
        Для немедленной согласованности (например, сразу после POST /orders) 
        используйте GET /orders/{id}.

Клиент понимает: сразу после создания заказа запрос к /summary может вернуть старые данные или 404. Это не баг — это архитектурное свойство.

Read-your-writes: когда нужно гарантировать свежесть

Иногда пользователь должен сразу увидеть результат своего действия. Например: создал заказ → открыл список заказов → увидел его. Три способа справиться с этим, от простого к сложному.

Два endpoint'а с разными гарантиями

Самый прямолинейный подход: один endpoint читает из write-store (мгновенно согласованный), другой — из read-model (eventual consistency):

/orders/{id}:
  get:
    summary: Заказ (из write-store, мгновенная согласованность)

/orders/{id}/summary:
  get:
    summary: Сводка заказа (из read-проекции, eventual consistency)

Клиент сам выбирает, что ему нужно. Никакой магии, всё явно.

Sticky session через gateway

Запросы одного клиента направляются на один и тот же pod. Если read-model обновляется локально в памяти — клиент сразу видит свежие данные. Работает только в рамках одного pod, хрупко при перезапусках.

Polling после записи

Command-handler после коммита ждёт, пока изменение появится в read-model:

@Transactional
public OrderId handle(CreateOrderCommand cmd) {
    Order order = orderFactory.createFor(cmd.customerId(), cmd.items());
    orderRepository.save(order);
    TransactionSynchronizationManager.registerSynchronization(
        new TransactionSynchronization() {
            @Override public void afterCommit() {
                pollUntilVisible(order.id(), Duration.ofSeconds(2));
            }
        }
    );
    return order.id();
}

Минус: POST-запрос становится медленным (до 2 секунд в худшем случае). Подходит только для редких критичных операций.

Частые ошибки

Обновление read-model в command-handler. Когда write-side и read-side связаны напрямую через метод, отказоустойчивость теряется, а распределённый сценарий (разные сервисы) работать не будет. Правильно — через outbox и consumer.

PG-триггеры для синхронизации. Соблазнительно: поставил триггер на write-таблицу, и он сам обновляет read-таблицу. Но триггер — невидимая магия: программист читает Java-код и не знает, что UPDATE на одну таблицу пишет в другую. На массовых операциях триггер работает на каждую строку — медленно. Не работает между разными базами и сервисами.

Event как слепок write-таблицы. Если событие — это просто POJO, сгенерированный из структуры write-таблицы, любое изменение схемы сломает consumer'ов. Событие должно быть независимым record'ом:

// Зависимый от схемы — опасно
public record OrderConfirmed(OrderRecord row) {}

// Независимый — правильно
public record OrderConfirmed(
    UUID eventId,
    Long orderId,
    OrderStatus status,
    Instant confirmedAt,
    long aggregateVersion
) {}

Независимый record можно версионировать (OrderConfirmedV2), можно менять write-схему без поломки consumer'ов.

Consumer без защиты от дублей. Kafka доставляет at-least-once. Без idempotency read-model рано или поздно испортится.

Коротко

  • Синхронизация write → read идёт через outbox + Kafka, не напрямую. Outbox записывается в той же транзакции, что и агрегат — гарантия атомарности.
  • Outbox-relay читает непубличные записи из outbox и публикует в Kafka. Пока не опубликовано — будет ретраить.
  • Idempotent consumer обязателен: Kafka может доставить событие дважды. Защита — таблица processed_event или UPDATE по версии агрегата.
  • Bootstrap-rebuild нужен при создании новой read-model или после аварии: проход по write-store пачками, а не ожидание старых событий из Kafka.
  • Eventual consistency декларируется в API: клиент должен знать, что после записи проекция обновится с задержкой.
  • Read-your-writes: два endpoint'а с разными гарантиями — самое простое решение. Polling и sticky session — для особых случаев.
  • Event — независимый record, не слепок write-таблицы. Иначе любая миграция схемы ломает consumer'ов.

Что почитать дальше

  • CQRS: разделение записи и чтения — основы паттерна.
  • Read-model в CQRS: где и как хранить проекцию — варианты хранилищ и структура проекций.
  • Command side в CQRS — как outbox-событие регистрируется в write-handler.