Опирается на правила: R-DIST-OBX-1R-DIST-OBX-3 и R-DIST-OBX-X1R-DIST-OBX-X2 из Distributed Patterns Style Guide → раздел 5. Outbox + Inbox.

Важно знать

  • Outbox решает «БД commit + message publish атомарно»: одна локальная транзакция в PG.
  • Команда commits в БД и INSERT в outbox_event атомарно; outbox-relay (@Scheduled с FOR UPDATE SKIP LOCKED) публикует в Kafka и помечает published_at.
  • Inbox — обратное: consumer пишет полученное сообщение в inbox_event с processed=false, отдельный handler обрабатывает. Используется только для критических сценариев — в большинстве случаев достаточно processed_event dedup.
  • Single source of truth — БД сервиса. Kafka — транспорт сообщений, не источник правды.
  • При потере Kafka-данных — outbox-таблица продолжает накапливать, после восстановления Kafka — публикует.
  • Запрет direct send из command-handler в Kafka — KafkaTemplate.send() в @Transactional не атомарен с DB commit.
  • Запрет @TransactionalEventListener для Kafka — событие отправится даже если DB commit упал.

Outbox — фундаментальный паттерн UCP. Все события из write-handler-ов идут через outbox, без исключений. Это даёт at-least-once гарантию доставки без двухфазного коммита.

Проблема «commit + publish»

Что хочется:

@Transactional
public Order handle(CreateOrderCommand command) {
    var order = orderRepository.save(new Order(command));
    kafkaTemplate.send("order.events", new OrderCreatedEvent(order));
    return order;
}

Что не работает:

  • DB commit прошёл, Kafka send упал → событие потеряно, downstream не знает.
  • Kafka send прошёл, DB commit упал → событие отправлено, но заказа в БД нет → downstream обрабатывает фантом.
  • Network partition между этими двумя — гарантий никаких.

Distributed transaction между PG и Kafka не существует (Kafka не поддерживает XA). Outbox решает это через локальную транзакцию в PG.

Outbox pattern

R-DIST-OBX-1: outbox для исходящих событий обязателен.

Схема outbox-таблицы

CREATE TABLE outbox_event (
    id              bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    event_id        uuid NOT NULL UNIQUE,
    aggregate_type  text NOT NULL,
    aggregate_id    text NOT NULL,
    event_type      text NOT NULL,
    payload         jsonb NOT NULL,
    topic           text NOT NULL,
    partition_key   text NOT NULL,
    created_at      timestamptz NOT NULL DEFAULT now(),
    published_at    timestamptz
);
CREATE INDEX ix_outbox_event_unpublished ON outbox_event(id) WHERE published_at IS NULL;

Partial index WHERE published_at IS NULL — relay сканирует только unpublished. После публикации row остаётся в таблице (для audit), но из «горячего» индекса исчезает.

Write-handler

@UseCase
@RequiredArgsConstructor
public class CreateOrderHandler implements UseCaseHandler<CreateOrderCommand, Order> {

    private final OrderRepository orderRepository;
    private final OutboxEventPublisher outboxEventPublisher;

    @Override
    @Transactional
    public Order handle(CreateOrderCommand command) {
        var order = orderRepository.save(Order.create(command));

        outboxEventPublisher.publish(
            "order.events",
            order.id().toString(),
            new OrderCreatedEvent(
                UUID.randomUUID(),
                order.id(),
                order.customerId(),
                order.amount(),
                Instant.now()
            )
        );
        return order;
    }
}

OutboxEventPublisher.publish делает INSERT INTO outbox_event в той же транзакции. Либо INSERT order и INSERT outbox_event оба commit, либо оба откатываются.

Outbox-relay

@Component
@RequiredArgsConstructor
public class OutboxRelay {

    private final DSLContext dsl;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Scheduled(fixedDelay = 500)
    public void publish() {
        dsl.transaction(tx -> {
            var batch = tx.dsl()
                .selectFrom(OUTBOX_EVENT)
                .where(OUTBOX_EVENT.PUBLISHED_AT.isNull())
                .orderBy(OUTBOX_EVENT.ID)
                .limit(100)
                .forUpdate().skipLocked()
                .fetch();

            for (var row : batch) {
                kafkaTemplate.send(row.getTopic(), row.getPartitionKey(), row.getPayload()).get();
                row.setPublishedAt(Instant.now());
                row.store();
            }
        });
    }
}

FOR UPDATE SKIP LOCKED — несколько инстансов relay-я могут гонять параллельно: каждый берёт свою порцию unpublished, не блокируя других. Это даёт горизонтальное масштабирование без координации.

После публикации published_at = now(). Если relay упал между kafkaTemplate.send и store — следующий запуск возьмёт это же событие и опубликует снова. At-least-once, ровно поэтому receiver обязан быть идемпотентным.

Single source of truth

R-DIST-OBX-3: БД сервиса — единственный источник правды. Kafka — транспорт.

Что это даёт:

  • Потеря Kafka-данных (например, retention истёк, broker сломался) — outbox-таблица продолжает накапливать, после восстановления Kafka relay публикует пропущенное.
  • Rebuild read-projection — можно скриптом перепрочитать всю outbox_event таблицу и опубликовать события заново. Read-side reconsume их идемпотентно.
  • Audit — каждое событие, которое сервис когда-либо порождал, остаётся в БД. История полная.

Сравнить с подходом «Kafka — source of truth» (Event Sourcing): сложнее в эксплуатации, требует Kafka Connect / KStreams, retention становится критичным. UCP выбирает PG как SoT.

Inbox pattern

R-DIST-OBX-2: inbox — обратная сторона outbox. Consumer пишет полученное сообщение в inbox_event с processed=false, отдельный handler обрабатывает unprocessed-rows в локальных транзакциях.

CREATE TABLE inbox_event (
    event_id      uuid PRIMARY KEY,
    received_at   timestamptz NOT NULL DEFAULT now(),
    payload       jsonb NOT NULL,
    processed     boolean NOT NULL DEFAULT false,
    processed_at  timestamptz
);
CREATE INDEX ix_inbox_event_unprocessed ON inbox_event(received_at) WHERE NOT processed;
@KafkaListener(topics = "payment.events")
@Transactional
public void onPaymentEvent(PaymentEvent event) {
    inboxEventRepository.tryInsert(event.eventId(), event.payload());
}

@Scheduled(fixedDelay = 200)
public void processInbox() {
    var batch = inboxEventRepository.fetchUnprocessed(100);
    for (var row : batch) {
        processEventInLocalTransaction(row);
    }
}

Когда использовать inbox

  • Bursty traffic — Kafka даёт burst 10k msg/s, обработка тяжёлая (5 ms × 10k = 50 секунд). Inbox развязывает приём и обработку: consumer принимает быстро, handler работает в своём темпе.
  • Critical financial flows — нужно жёстко гарантировать, что ни одно сообщение не потеряется между приёмом и обработкой даже при крэше.

В большинстве случаев inbox избыточен — достаточно processed_event dedup-таблицы. Если обработка лёгкая и helper-ов retry topic + DLQ хватает — inbox добавляет сложности без пользы.

КритерийТолько processed_eventInbox + handler
Сложностьнизкаясредняя
Burst handlingограничен concurrency consumer-аразвязка приёма и обработки
Recovery после крэшаre-consume из Kafkaотдельная обработка inbox
Когда применятьдефолтfinancial / high burst

Что запрещено

Direct send из command-handler

R-DIST-OBX-X1: KafkaTemplate.send() внутри @Transactional write-handler-а — не атомарен с DB commit.

// ПЛОХО — нет атомарности
@Transactional
public Order handle(CreateOrderCommand command) {
    var order = orderRepository.save(...);
    kafkaTemplate.send("order.events", new OrderCreatedEvent(order));
    return order;
}

Send происходит до commit. Если transaction откатилась после send (constraint violation, deadlock, что угодно) — событие уже в Kafka, заказа в БД нет. См. R-KFK-PROD-X4.

@TransactionalEventListener для Kafka

R-DIST-OBX-X2: ещё один опасный паттерн — отправка в AFTER_COMMIT callback.

// ПЛОХО — событие отправится даже если Kafka недоступна, но retry/dedup не работают
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onOrderCreated(OrderCreatedDomainEvent event) {
    kafkaTemplate.send("order.events", event);
}

Проблемы:

  • Если Kafka недоступна в момент AFTER_COMMIT — exception в listener-е пропадает в логе, событие потеряно, ни retry, ни recovery.
  • DB commit прошёл, событие не отправилось — downstream не знает, состояние рассинхронизировано.

Корректно — outbox-таблица: запись синхронно в той же транзакции, публикация асинхронно с retry. См. R-KFK-OBX-X2.

Что запрещено — таблица

АнтипаттернПравилоЧто взамен
Direct KafkaTemplate.send в command-handlerR-DIST-OBX-X1outbox-таблица + relay
@TransactionalEventListener для KafkaR-DIST-OBX-X2outbox-relay
Outbox-relay без SKIP LOCKEDR-DIST-OBX-1FOR UPDATE SKIP LOCKED для конкурентных relay-ев
Outbox без partial-index unpublishedR-DIST-OBX-1WHERE published_at IS NULL
Kafka как source of truthR-DIST-OBX-3PG sole SoT, Kafka транспорт
Inbox для каждого consumer-аR-DIST-OBX-2processed_event dedup дефолт, inbox только для critical
DELETE published rows из outboxR-DIST-OBX-3hold для audit/rebuild, cleanup отдельным job-ом по retention

Куда дальше

  • Distributed Patterns → раздел 5. Outbox + Inbox — нормативные формулировки.
  • Idempotency — receiver обязан быть идемпотентным при at-least-once.
  • Saga — saga-state-таблица и outbox работают вместе.
  • Eventual consistency — outbox — главный механизм EC.
  • Compensation — compensation-команды публикуются через outbox.
  • Kafka → outbox publishing — детальная конфигурация relay-я.
  • PG runtime → outbox-relay — FOR UPDATE SKIP LOCKED, lock_timeout, batching.