Опирается на правила:
R-DIST-OBX-1…R-DIST-OBX-3иR-DIST-OBX-X1…R-DIST-OBX-X2из Distributed Patterns Style Guide → раздел 5. Outbox + Inbox.
Важно знать
- Outbox решает «БД commit + message publish атомарно»: одна локальная транзакция в PG.
- Команда commits в БД и INSERT в
outbox_eventатомарно; outbox-relay (@ScheduledсFOR UPDATE SKIP LOCKED) публикует в Kafka и помечаетpublished_at.- Inbox — обратное: consumer пишет полученное сообщение в
inbox_eventсprocessed=false, отдельный handler обрабатывает. Используется только для критических сценариев — в большинстве случаев достаточноprocessed_eventdedup.- Single source of truth — БД сервиса. Kafka — транспорт сообщений, не источник правды.
- При потере Kafka-данных — outbox-таблица продолжает накапливать, после восстановления Kafka — публикует.
- Запрет direct send из command-handler в Kafka —
KafkaTemplate.send()в@Transactionalне атомарен с DB commit.- Запрет
@TransactionalEventListenerдля Kafka — событие отправится даже если DB commit упал.
Outbox — фундаментальный паттерн UCP. Все события из write-handler-ов идут через outbox, без исключений. Это даёт at-least-once гарантию доставки без двухфазного коммита.
Проблема «commit + publish»
Что хочется:
@Transactional
public Order handle(CreateOrderCommand command) {
var order = orderRepository.save(new Order(command));
kafkaTemplate.send("order.events", new OrderCreatedEvent(order));
return order;
}
Что не работает:
- DB commit прошёл, Kafka send упал → событие потеряно, downstream не знает.
- Kafka send прошёл, DB commit упал → событие отправлено, но заказа в БД нет → downstream обрабатывает фантом.
- Network partition между этими двумя — гарантий никаких.
Distributed transaction между PG и Kafka не существует (Kafka не поддерживает XA). Outbox решает это через локальную транзакцию в PG.
Outbox pattern
R-DIST-OBX-1: outbox для исходящих событий обязателен.
Схема outbox-таблицы
CREATE TABLE outbox_event (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
event_id uuid NOT NULL UNIQUE,
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
event_type text NOT NULL,
payload jsonb NOT NULL,
topic text NOT NULL,
partition_key text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
published_at timestamptz
);
CREATE INDEX ix_outbox_event_unpublished ON outbox_event(id) WHERE published_at IS NULL;
Partial index WHERE published_at IS NULL — relay сканирует только unpublished. После публикации row остаётся в таблице (для audit), но из «горячего» индекса исчезает.
Write-handler
@UseCase
@RequiredArgsConstructor
public class CreateOrderHandler implements UseCaseHandler<CreateOrderCommand, Order> {
private final OrderRepository orderRepository;
private final OutboxEventPublisher outboxEventPublisher;
@Override
@Transactional
public Order handle(CreateOrderCommand command) {
var order = orderRepository.save(Order.create(command));
outboxEventPublisher.publish(
"order.events",
order.id().toString(),
new OrderCreatedEvent(
UUID.randomUUID(),
order.id(),
order.customerId(),
order.amount(),
Instant.now()
)
);
return order;
}
}
OutboxEventPublisher.publish делает INSERT INTO outbox_event в той же транзакции. Либо INSERT order и INSERT outbox_event оба commit, либо оба откатываются.
Outbox-relay
@Component
@RequiredArgsConstructor
public class OutboxRelay {
private final DSLContext dsl;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Scheduled(fixedDelay = 500)
public void publish() {
dsl.transaction(tx -> {
var batch = tx.dsl()
.selectFrom(OUTBOX_EVENT)
.where(OUTBOX_EVENT.PUBLISHED_AT.isNull())
.orderBy(OUTBOX_EVENT.ID)
.limit(100)
.forUpdate().skipLocked()
.fetch();
for (var row : batch) {
kafkaTemplate.send(row.getTopic(), row.getPartitionKey(), row.getPayload()).get();
row.setPublishedAt(Instant.now());
row.store();
}
});
}
}
FOR UPDATE SKIP LOCKED — несколько инстансов relay-я могут гонять параллельно: каждый берёт свою порцию unpublished, не блокируя других. Это даёт горизонтальное масштабирование без координации.
После публикации published_at = now(). Если relay упал между kafkaTemplate.send и store — следующий запуск возьмёт это же событие и опубликует снова. At-least-once, ровно поэтому receiver обязан быть идемпотентным.
Single source of truth
R-DIST-OBX-3: БД сервиса — единственный источник правды. Kafka — транспорт.
Что это даёт:
- Потеря Kafka-данных (например, retention истёк, broker сломался) — outbox-таблица продолжает накапливать, после восстановления Kafka relay публикует пропущенное.
- Rebuild read-projection — можно скриптом перепрочитать всю
outbox_eventтаблицу и опубликовать события заново. Read-side reconsume их идемпотентно. - Audit — каждое событие, которое сервис когда-либо порождал, остаётся в БД. История полная.
Сравнить с подходом «Kafka — source of truth» (Event Sourcing): сложнее в эксплуатации, требует Kafka Connect / KStreams, retention становится критичным. UCP выбирает PG как SoT.
Inbox pattern
R-DIST-OBX-2: inbox — обратная сторона outbox. Consumer пишет полученное сообщение в inbox_event с processed=false, отдельный handler обрабатывает unprocessed-rows в локальных транзакциях.
CREATE TABLE inbox_event (
event_id uuid PRIMARY KEY,
received_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
processed boolean NOT NULL DEFAULT false,
processed_at timestamptz
);
CREATE INDEX ix_inbox_event_unprocessed ON inbox_event(received_at) WHERE NOT processed;
@KafkaListener(topics = "payment.events")
@Transactional
public void onPaymentEvent(PaymentEvent event) {
inboxEventRepository.tryInsert(event.eventId(), event.payload());
}
@Scheduled(fixedDelay = 200)
public void processInbox() {
var batch = inboxEventRepository.fetchUnprocessed(100);
for (var row : batch) {
processEventInLocalTransaction(row);
}
}
Когда использовать inbox
- Bursty traffic — Kafka даёт burst 10k msg/s, обработка тяжёлая (5 ms × 10k = 50 секунд). Inbox развязывает приём и обработку: consumer принимает быстро, handler работает в своём темпе.
- Critical financial flows — нужно жёстко гарантировать, что ни одно сообщение не потеряется между приёмом и обработкой даже при крэше.
В большинстве случаев inbox избыточен — достаточно processed_event dedup-таблицы. Если обработка лёгкая и helper-ов retry topic + DLQ хватает — inbox добавляет сложности без пользы.
| Критерий | Только processed_event | Inbox + handler |
|---|---|---|
| Сложность | низкая | средняя |
| Burst handling | ограничен concurrency consumer-а | развязка приёма и обработки |
| Recovery после крэша | re-consume из Kafka | отдельная обработка inbox |
| Когда применять | дефолт | financial / high burst |
Что запрещено
Direct send из command-handler
R-DIST-OBX-X1: KafkaTemplate.send() внутри @Transactional write-handler-а — не атомарен с DB commit.
// ПЛОХО — нет атомарности
@Transactional
public Order handle(CreateOrderCommand command) {
var order = orderRepository.save(...);
kafkaTemplate.send("order.events", new OrderCreatedEvent(order));
return order;
}
Send происходит до commit. Если transaction откатилась после send (constraint violation, deadlock, что угодно) — событие уже в Kafka, заказа в БД нет. См. R-KFK-PROD-X4.
@TransactionalEventListener для Kafka
R-DIST-OBX-X2: ещё один опасный паттерн — отправка в AFTER_COMMIT callback.
// ПЛОХО — событие отправится даже если Kafka недоступна, но retry/dedup не работают
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onOrderCreated(OrderCreatedDomainEvent event) {
kafkaTemplate.send("order.events", event);
}
Проблемы:
- Если Kafka недоступна в момент
AFTER_COMMIT— exception в listener-е пропадает в логе, событие потеряно, ни retry, ни recovery. - DB commit прошёл, событие не отправилось — downstream не знает, состояние рассинхронизировано.
Корректно — outbox-таблица: запись синхронно в той же транзакции, публикация асинхронно с retry. См. R-KFK-OBX-X2.
Что запрещено — таблица
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Direct KafkaTemplate.send в command-handler | R-DIST-OBX-X1 | outbox-таблица + relay |
@TransactionalEventListener для Kafka | R-DIST-OBX-X2 | outbox-relay |
Outbox-relay без SKIP LOCKED | R-DIST-OBX-1 | FOR UPDATE SKIP LOCKED для конкурентных relay-ев |
| Outbox без partial-index unpublished | R-DIST-OBX-1 | WHERE published_at IS NULL |
| Kafka как source of truth | R-DIST-OBX-3 | PG sole SoT, Kafka транспорт |
| Inbox для каждого consumer-а | R-DIST-OBX-2 | processed_event dedup дефолт, inbox только для critical |
| DELETE published rows из outbox | R-DIST-OBX-3 | hold для audit/rebuild, cleanup отдельным job-ом по retention |
Куда дальше
- Distributed Patterns → раздел 5. Outbox + Inbox — нормативные формулировки.
- Idempotency — receiver обязан быть идемпотентным при at-least-once.
- Saga — saga-state-таблица и outbox работают вместе.
- Eventual consistency — outbox — главный механизм EC.
- Compensation — compensation-команды публикуются через outbox.
- Kafka → outbox publishing — детальная конфигурация relay-я.
- PG runtime → outbox-relay — FOR UPDATE SKIP LOCKED, lock_timeout, batching.