Опирается на правила: R-KFK-OBX-1R-KFK-OBX-4 и R-KFK-OBX-X1R-KFK-OBX-X3 из Kafka Style Guide → раздел 3. Outbox publishing.

Важно знать

  • Domain events публикуются через outbox-relay, не напрямую KafkaTemplate.send из handler.
  • Запись в outbox_event идёт в той же DB-транзакции, что бизнес-write. Либо обе commit, либо обе rollback.
  • Outbox-relay — отдельный @Component с @Scheduled, читает unpublished с FOR UPDATE SKIP LOCKED, публикует, помечает published_at.
  • Topic name derives от eventType/aggregateType: <service>.<aggregate-type>.<event-name>.
  • Relay в batch (10-50 events) — снижает overhead DB-poll и Kafka-roundtrip.
  • partial-index WHERE published_at IS NULL обязателен — без него full scan на каждом poll.
  • @TransactionalEventListener для Kafka без outbox — потеря событий при crash между commit и publish.

Outbox publishing — фундаментальный паттерн UCP. Все domain events публикуются через него. Это даёт at-least-once доставку с атомарностью «commit DB + публикация» через локальную транзакцию в PG. Подробнее теория — Distributed → outbox + inbox.

Запись в outbox из handler

R-KFK-OBX-1: write-handler пишет в outbox_event в той же транзакции.

@UseCase
@RequiredArgsConstructor
public class ConfirmOrderHandler implements UseCaseHandler<ConfirmOrderCommand, Order> {

    private final OrderRepository orderRepository;
    private final OutboxEventRepository outboxRepository;
    private final JsonbHelper jsonbHelper;

    @Override
    @Transactional
    public Order handle(ConfirmOrderCommand command) {
        var order = orderRepository.findById(command.orderId(), SelectMode.FOR_UPDATE).orElseThrow();
        order.confirm();
        orderRepository.save(order);

        outboxRepository.append(OutboxEvent.builder()
            .eventId(UuidV7.generate())
            .aggregateType("Order")
            .aggregateId(order.getId().toString())
            .eventType("order.confirmed.v1")
            .payload(jsonbHelper.serialize(OrderConfirmedEvent.from(order)))
            .topic("order-service.order.confirmed")
            .partitionKey(order.getId().toString())
            .build());

        return order;
    }
}

Атомарность гарантирует PG: либо обе записи commit, либо обе rollback. Никакой XA с Kafka не нужен.

Outbox-relay

R-KFK-OBX-2: отдельный @Component с @Scheduled.

@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxRelay {

    private final DSLContext dsl;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final ObjectMapper objectMapper;

    @Scheduled(fixedDelay = 500)
    public void publish() {
        dsl.transaction(tx -> {
            var batch = tx.dsl()
                .selectFrom(OUTBOX_EVENT)
                .where(OUTBOX_EVENT.PUBLISHED_AT.isNull())
                .orderBy(OUTBOX_EVENT.ID)
                .limit(50)
                .forUpdate().skipLocked()
                .fetch();

            for (var row : batch) {
                try {
                    var payload = objectMapper.readValue(row.getPayload().data(), Object.class);
                    kafkaTemplate.send(row.getTopic(), row.getPartitionKey(), payload).get();
                    row.setPublishedAt(Instant.now());
                    row.store();
                } catch (Exception e) {
                    log.error("Failed to publish outbox event id={} topic={}",
                        row.getId(), row.getTopic(), e);
                    throw new OutboxPublishException(row.getId(), e);
                }
            }
        });
    }
}

FOR UPDATE SKIP LOCKED — несколько pod-ов relay могут гонять параллельно, каждый берёт свою порцию, без блокировок. Горизонтальное масштабирование без координации.

fixedDelay = 500ms даёт типичную задержку публикации ~500ms. Для критичных flow можно 100ms, но цена — больше нагрузки на DB. Для аналитики — 5000ms нормально.

Schema outbox-таблицы

R-KFK-OBX-3, R-KFK-OBX-X3: схема с partial-индексом.

CREATE TABLE outbox_event (
    id              bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    event_id        uuid NOT NULL UNIQUE,
    aggregate_type  text NOT NULL,
    aggregate_id    text NOT NULL,
    event_type      text NOT NULL,
    payload         jsonb NOT NULL,
    topic           text NOT NULL,
    partition_key   text NOT NULL,
    created_at      timestamptz NOT NULL DEFAULT now(),
    published_at    timestamptz
);
CREATE INDEX ix_outbox_event_unpublished
    ON outbox_event(id)
    WHERE published_at IS NULL;

WHERE published_at IS NULL partial index — outbox_event растёт без bound, но из «горячего» индекса быстро уходит после публикации. Relay сканирует только unpublished.

Без partial index relay делает full scan миллиардной таблицы на каждом poll. Через неделю работы — DB CPU 100%.

Topic naming

R-KFK-OBX-3: convention <service>.<aggregate-type>.<event-name>.

СервисTopic
order-serviceorder-service.order.created
order-serviceorder-service.order.confirmed
order-serviceorder-service.order.cancelled
payment-servicepayment-service.payment.charged
payment-servicepayment-service.payment.refunded
inventory-serviceinventory-service.reservation.created

Альтернатива — один topic на aggregate с разными eventType в payload:

order-service.order  → {eventType: "OrderCreated.v1", ...}
                    → {eventType: "OrderConfirmed.v1", ...}
                    → {eventType: "OrderCancelled.v1", ...}

Это удобнее для consumer-side, который хочет «все события по заказу» — один @KafkaListener ловит всё, фильтрует по eventType. Цена — одно сообщение события OrderCancelled нельзя ack-нуть без обработки всего топика.

Batch обработка

R-KFK-OBX-4: relay читает 10-50 events за раз.

.limit(50)

Почему не по одному:

  • DB-poll overhead — каждый запрос ~1-2ms даже с indexed scan.
  • Kafka roundtripkafkaTemplate.send().get() ждёт ACK, ~5-20ms.
  • При 100 events/s по одному — relay постоянно busy, latency большой.

С batch 50 — relay поднимает 50 events одним запросом, шлёт параллельно через KafkaTemplate (с enable.idempotence: true гарантия ordering сохранена per-partition), published_at ставит в одном update. Throughput x10-20.

Что запрещено

KafkaTemplate.send в @Transactional с DB

R-KFK-OBX-X1: см. Producer R-KFK-PROD-X4. Kafka не XA.

@TransactionalEventListener для Kafka

R-KFK-OBX-X2: соблазн.

// ОПАСНО
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onOrderConfirmed(OrderConfirmedDomainEvent event) {
    kafkaTemplate.send("order-service.order.confirmed",
        event.orderId().toString(), event);
}

После commit, не в транзакции. Кажется, что атомарно. На самом деле:

  • DB commit прошёл, listener вызван.
  • kafkaTemplate.send бросает exception (Kafka недоступна 1 секунду).
  • Exception проглатывается Spring event mechanism.
  • DB committed, событие не отправлено. Inconsistency, никакого retry.

Outbox даёт retry автоматически (relay повторяет, пока не опубликует).

Outbox без published_at или без partial-index

R-KFK-OBX-X3: см. секцию «Schema» выше.

Что запрещено — таблица

АнтипаттернПравилоЧто взамен
KafkaTemplate.send в @Transactional с DBR-KFK-OBX-X1outbox в той же транзакции
@TransactionalEventListener для KafkaR-KFK-OBX-X2outbox-relay
Outbox без published_at partial indexR-KFK-OBX-X3WHERE published_at IS NULL
Relay без FOR UPDATE SKIP LOCKEDR-KFK-OBX-2parallel relay pods через SKIP LOCKED
Relay по одному событиюR-KFK-OBX-4batch 10-50
DELETE published rows immediatelyR-KFK-OBX-2hold для audit, cleanup отдельным job
Произвольное topic namingR-KFK-OBX-3<service>.<aggregate>.<event>
Outbox без event_id UNIQUER-KFK-OBX-1UNIQUE constraint защищает от двойной записи

Куда дальше

  • Kafka → раздел 3. Outbox publishing — нормативные формулировки.
  • Producer — почему нельзя send напрямую из handler.
  • Idempotent consumer — receiver side at-least-once.
  • Event design — формат payload в outbox.
  • Distributed → outbox + inbox — теория паттерна.
  • PG runtime → outbox-relay — детали FOR UPDATE SKIP LOCKED, lock_timeout.
  • PG migration → safe DDL — создание outbox-таблицы как safe migration.