Опирается на правила:
R-KFK-OBX-1…R-KFK-OBX-4иR-KFK-OBX-X1…R-KFK-OBX-X3из Kafka Style Guide → раздел 3. Outbox publishing.
Важно знать
- Domain events публикуются через outbox-relay, не напрямую
KafkaTemplate.sendиз handler.- Запись в
outbox_eventидёт в той же DB-транзакции, что бизнес-write. Либо обе commit, либо обе rollback.- Outbox-relay — отдельный
@Componentс@Scheduled, читает unpublished сFOR UPDATE SKIP LOCKED, публикует, помечаетpublished_at.- Topic name derives от
eventType/aggregateType:<service>.<aggregate-type>.<event-name>.- Relay в batch (10-50 events) — снижает overhead DB-poll и Kafka-roundtrip.
partial-index WHERE published_at IS NULLобязателен — без него full scan на каждом poll.@TransactionalEventListenerдля Kafka без outbox — потеря событий при crash между commit и publish.
Outbox publishing — фундаментальный паттерн UCP. Все domain events публикуются через него. Это даёт at-least-once доставку с атомарностью «commit DB + публикация» через локальную транзакцию в PG. Подробнее теория — Distributed → outbox + inbox.
Запись в outbox из handler
R-KFK-OBX-1: write-handler пишет в outbox_event в той же транзакции.
@UseCase
@RequiredArgsConstructor
public class ConfirmOrderHandler implements UseCaseHandler<ConfirmOrderCommand, Order> {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxRepository;
private final JsonbHelper jsonbHelper;
@Override
@Transactional
public Order handle(ConfirmOrderCommand command) {
var order = orderRepository.findById(command.orderId(), SelectMode.FOR_UPDATE).orElseThrow();
order.confirm();
orderRepository.save(order);
outboxRepository.append(OutboxEvent.builder()
.eventId(UuidV7.generate())
.aggregateType("Order")
.aggregateId(order.getId().toString())
.eventType("order.confirmed.v1")
.payload(jsonbHelper.serialize(OrderConfirmedEvent.from(order)))
.topic("order-service.order.confirmed")
.partitionKey(order.getId().toString())
.build());
return order;
}
}
Атомарность гарантирует PG: либо обе записи commit, либо обе rollback. Никакой XA с Kafka не нужен.
Outbox-relay
R-KFK-OBX-2: отдельный @Component с @Scheduled.
@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxRelay {
private final DSLContext dsl;
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
@Scheduled(fixedDelay = 500)
public void publish() {
dsl.transaction(tx -> {
var batch = tx.dsl()
.selectFrom(OUTBOX_EVENT)
.where(OUTBOX_EVENT.PUBLISHED_AT.isNull())
.orderBy(OUTBOX_EVENT.ID)
.limit(50)
.forUpdate().skipLocked()
.fetch();
for (var row : batch) {
try {
var payload = objectMapper.readValue(row.getPayload().data(), Object.class);
kafkaTemplate.send(row.getTopic(), row.getPartitionKey(), payload).get();
row.setPublishedAt(Instant.now());
row.store();
} catch (Exception e) {
log.error("Failed to publish outbox event id={} topic={}",
row.getId(), row.getTopic(), e);
throw new OutboxPublishException(row.getId(), e);
}
}
});
}
}
FOR UPDATE SKIP LOCKED — несколько pod-ов relay могут гонять параллельно, каждый берёт свою порцию, без блокировок. Горизонтальное масштабирование без координации.
fixedDelay = 500ms даёт типичную задержку публикации ~500ms. Для критичных flow можно 100ms, но цена — больше нагрузки на DB. Для аналитики — 5000ms нормально.
Schema outbox-таблицы
R-KFK-OBX-3, R-KFK-OBX-X3: схема с partial-индексом.
CREATE TABLE outbox_event (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
event_id uuid NOT NULL UNIQUE,
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
event_type text NOT NULL,
payload jsonb NOT NULL,
topic text NOT NULL,
partition_key text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
published_at timestamptz
);
CREATE INDEX ix_outbox_event_unpublished
ON outbox_event(id)
WHERE published_at IS NULL;
WHERE published_at IS NULL partial index — outbox_event растёт без bound, но из «горячего» индекса быстро уходит после публикации. Relay сканирует только unpublished.
Без partial index relay делает full scan миллиардной таблицы на каждом poll. Через неделю работы — DB CPU 100%.
Topic naming
R-KFK-OBX-3: convention <service>.<aggregate-type>.<event-name>.
| Сервис | Topic |
|---|---|
| order-service | order-service.order.created |
| order-service | order-service.order.confirmed |
| order-service | order-service.order.cancelled |
| payment-service | payment-service.payment.charged |
| payment-service | payment-service.payment.refunded |
| inventory-service | inventory-service.reservation.created |
Альтернатива — один topic на aggregate с разными eventType в payload:
order-service.order → {eventType: "OrderCreated.v1", ...}
→ {eventType: "OrderConfirmed.v1", ...}
→ {eventType: "OrderCancelled.v1", ...}
Это удобнее для consumer-side, который хочет «все события по заказу» — один @KafkaListener ловит всё, фильтрует по eventType. Цена — одно сообщение события OrderCancelled нельзя ack-нуть без обработки всего топика.
Batch обработка
R-KFK-OBX-4: relay читает 10-50 events за раз.
.limit(50)
Почему не по одному:
- DB-poll overhead — каждый запрос ~1-2ms даже с indexed scan.
- Kafka roundtrip —
kafkaTemplate.send().get()ждёт ACK, ~5-20ms. - При 100 events/s по одному — relay постоянно busy, latency большой.
С batch 50 — relay поднимает 50 events одним запросом, шлёт параллельно через KafkaTemplate (с enable.idempotence: true гарантия ordering сохранена per-partition), published_at ставит в одном update. Throughput x10-20.
Что запрещено
KafkaTemplate.send в @Transactional с DB
R-KFK-OBX-X1: см. Producer R-KFK-PROD-X4. Kafka не XA.
@TransactionalEventListener для Kafka
R-KFK-OBX-X2: соблазн.
// ОПАСНО
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onOrderConfirmed(OrderConfirmedDomainEvent event) {
kafkaTemplate.send("order-service.order.confirmed",
event.orderId().toString(), event);
}
После commit, не в транзакции. Кажется, что атомарно. На самом деле:
- DB commit прошёл, listener вызван.
kafkaTemplate.sendбросает exception (Kafka недоступна 1 секунду).- Exception проглатывается Spring event mechanism.
- DB committed, событие не отправлено. Inconsistency, никакого retry.
Outbox даёт retry автоматически (relay повторяет, пока не опубликует).
Outbox без published_at или без partial-index
R-KFK-OBX-X3: см. секцию «Schema» выше.
Что запрещено — таблица
| Антипаттерн | Правило | Что взамен |
|---|---|---|
KafkaTemplate.send в @Transactional с DB | R-KFK-OBX-X1 | outbox в той же транзакции |
@TransactionalEventListener для Kafka | R-KFK-OBX-X2 | outbox-relay |
Outbox без published_at partial index | R-KFK-OBX-X3 | WHERE published_at IS NULL |
Relay без FOR UPDATE SKIP LOCKED | R-KFK-OBX-2 | parallel relay pods через SKIP LOCKED |
| Relay по одному событию | R-KFK-OBX-4 | batch 10-50 |
DELETE published rows immediately | R-KFK-OBX-2 | hold для audit, cleanup отдельным job |
| Произвольное topic naming | R-KFK-OBX-3 | <service>.<aggregate>.<event> |
Outbox без event_id UNIQUE | R-KFK-OBX-1 | UNIQUE constraint защищает от двойной записи |
Куда дальше
- Kafka → раздел 3. Outbox publishing — нормативные формулировки.
- Producer — почему нельзя send напрямую из handler.
- Idempotent consumer — receiver side at-least-once.
- Event design — формат payload в outbox.
- Distributed → outbox + inbox — теория паттерна.
- PG runtime → outbox-relay — детали
FOR UPDATE SKIP LOCKED, lock_timeout. - PG migration → safe DDL — создание outbox-таблицы как safe migration.