Опирается на правила:
R-KFK-OBX-1…R-KFK-OBX-4иR-KFK-OBX-X1…R-KFK-OBX-X3из Kafka Style Guide → раздел 3. Outbox publishing.
Важно знать
- Domain events публикуются через outbox-relay, не напрямую
writer.WriteMessagesиз UseCase Handler.- Запись в
outboxидёт в той жеpgx.Tx, что бизнес-write. Либо обе commit, либо обе rollback.- Outbox-relay — отдельная горутина (
go relay.Run(ctx)), читает unpublished сFOR UPDATE SKIP LOCKED, публикует черезkafka.Writer, проставляетpublished_at.- Topic name выводится из
event_type/aggregate_typeчерез статический маппинг, не строковыми операциями на лету.- Relay работает batch (10–50 events) — снижает overhead DB-poll и Kafka-roundtrip.
- Partial index
WHERE published_at IS NULLобязателен — без него full scan растущей таблицы на каждом тике.writer.WriteMessagesпослеtx.Commitбез outbox — потеря события при падении между commit и publish.
Outbox publishing — фундаментальный паттерн UCP. Все domain events публикуются через него. Это даёт at-least-once доставку с атомарностью «commit DB + публикация» через локальную транзакцию в PostgreSQL, без XA с Kafka. Теория паттерна — Distributed → outbox + inbox.
Запись в outbox из handler
R-KFK-OBX-1: write-handler пишет в outbox в той же pgx.Tx через которую идёт бизнес-write. Kafka в этом слое не появляется вообще.
// core/order/handler/confirm_order_handler.go
type ConfirmOrderHandler struct {
db *pgxpool.Pool
orders OrderRepository
outbox OutboxRepository
}
func (h *ConfirmOrderHandler) Handle(ctx context.Context, cmd ConfirmOrderCommand) error {
tx, err := h.db.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
order, err := h.orders.GetForUpdate(ctx, tx, cmd.OrderID)
if err != nil {
return err
}
order.Confirm()
if err := h.orders.Save(ctx, tx, order); err != nil {
return err
}
evt := NewOrderConfirmedEvent(order)
if err := h.outbox.Store(ctx, tx, OutboxEntry{
EventID: evt.EventID,
AggregateType: "Order",
AggregateID: order.ID,
EventType: "OrderConfirmed",
Payload: mustMarshal(evt),
Topic: topicFor("OrderConfirmed"),
PartitionKey: order.ID,
}); err != nil {
return fmt.Errorf("outbox store: %w", err)
}
return tx.Commit(ctx)
}
Атомарность гарантирует PG: если tx.Commit не прошёл — записи нет ни в orders, ни в outbox. Никакого XA с Kafka не нужен.
OutboxEntry — плоская структура без бизнес-логики:
// infra/outbox/entry.go
type OutboxEntry struct {
EventID string
AggregateType string
AggregateID string
EventType string
Payload []byte
Topic string
PartitionKey string
}
Schema outbox-таблицы
R-KFK-OBX-X3: partial index по published_at IS NULL — обязателен.
CREATE TABLE outbox (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
event_id uuid NOT NULL UNIQUE,
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
event_type text NOT NULL,
payload jsonb NOT NULL,
topic text NOT NULL,
partition_key text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
published_at timestamptz
);
CREATE INDEX ix_outbox_unpublished
ON outbox (id)
WHERE published_at IS NULL;
WHERE published_at IS NULL — relay сканирует только непубликованные. Таблица растёт без ограничений, но «горячая» часть индекса минимальна. Без partial index relay делает full scan миллиардной таблицы на каждом тике.
event_id UNIQUE — защита от двойной записи одного события при повторном вызове handler (retried request до транзакции).
OutboxRepository — запись через pgx.Tx
// infra/outbox/repository.go
type OutboxRepository struct{}
func (r *OutboxRepository) Store(ctx context.Context, tx pgx.Tx, e OutboxEntry) error {
_, err := tx.Exec(ctx, `
INSERT INTO outbox
(event_id, aggregate_type, aggregate_id, event_type, payload, topic, partition_key)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (event_id) DO NOTHING
`,
e.EventID, e.AggregateType, e.AggregateID,
e.EventType, e.Payload, e.Topic, e.PartitionKey,
)
if err != nil {
return fmt.Errorf("outbox insert %s: %w", e.EventID, err)
}
return nil
}
func (r *OutboxRepository) FetchUnpublished(ctx context.Context, db *pgxpool.Pool, limit int) ([]outboxRow, error) {
rows, err := db.Query(ctx, `
SELECT id, event_id, topic, partition_key, payload
FROM outbox
WHERE published_at IS NULL
ORDER BY id
LIMIT $1
FOR UPDATE SKIP LOCKED
`, limit)
if err != nil {
return nil, fmt.Errorf("fetch unpublished: %w", err)
}
defer rows.Close()
return pgx.CollectRows(rows, pgx.RowToStructByName[outboxRow])
}
func (r *OutboxRepository) MarkPublished(ctx context.Context, db *pgxpool.Pool, id int64) error {
_, err := db.Exec(ctx,
`UPDATE outbox SET published_at = now() WHERE id = $1`, id,
)
return err
}
FOR UPDATE SKIP LOCKED — несколько экземпляров relay (разные pod-ы) берут разные порции без блокировок. Горизонтальное масштабирование без координации.
ON CONFLICT (event_id) DO NOTHING в insert — идемпотентная запись. Повторный вызов handler не создаст дубля в outbox.
Outbox-relay
R-KFK-OBX-2: отдельная горутина, запускается в main рядом с HTTP-сервером.
// infra/kafka/outbox_relay.go
type OutboxRelay struct {
db *pgxpool.Pool
repo *OutboxRepository
writer *kafka.Writer
log *slog.Logger
}
func (r *OutboxRelay) Run(ctx context.Context) {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.processBatch(ctx); err != nil {
r.log.ErrorContext(ctx, "outbox relay batch failed", "error", err)
}
}
}
}
func (r *OutboxRelay) processBatch(ctx context.Context) error {
events, err := r.repo.FetchUnpublished(ctx, r.db, 50) // R-KFK-OBX-4: batch 10–50
if err != nil {
return fmt.Errorf("fetch: %w", err)
}
for _, e := range events {
msg := kafka.Message{
Topic: e.Topic,
Key: []byte(e.PartitionKey),
Value: e.Payload,
}
if err := r.writer.WriteMessages(ctx, msg); err != nil {
return fmt.Errorf("publish %s %s: %w", e.EventType, e.EventID, err)
}
if err := r.repo.MarkPublished(ctx, r.db, e.ID); err != nil {
return fmt.Errorf("mark published %s: %w", e.EventID, err)
}
}
return nil
}
При ошибке publish relay возвращает ошибку и ждёт следующего тика. Строка в outbox остаётся с published_at IS NULL — relay повторит. Это и есть автоматический retry без дополнительной инфраструктуры.
Запуск из main:
// main.go
relay := &OutboxRelay{db: pool, repo: outboxRepo, writer: kafkaWriter, log: logger}
g.Go(func() error { relay.Run(gctx); return nil })
g — golang.org/x/sync/errgroup или аналог для управления жизненным циклом горутин.
Topic naming
R-KFK-OBX-3: топик выводится из event_type через статический маппинг, не строковыми операциями на лету.
// infra/kafka/topics.go
var topicByEventType = map[string]string{
"OrderConfirmed": "order-service.order.confirmed",
"OrderCancelled": "order-service.order.cancelled",
"ProductReserved": "order-service.product.reserved",
"CustomerBlocked": "customer-service.customer.blocked",
"PaymentCharged": "payment-service.payment.charged",
}
func topicFor(eventType string) string {
t, ok := topicByEventType[eventType]
if !ok {
panic(fmt.Sprintf("unknown event type: %s", eventType))
}
return t
}
Convention — <service>.<aggregate-type>.<event-name>:
| Сервис | Topic |
|---|---|
| order-service | order-service.order.confirmed |
| order-service | order-service.order.cancelled |
| order-service | order-service.product.reserved |
| customer-service | customer-service.customer.blocked |
| payment-service | payment-service.payment.charged |
Альтернатива — один topic на aggregate (order-service.order), consumer фильтрует по event_type в payload. Удобно когда один consumer хочет все события одного агрегата. Цена — нет возможности ack только один тип события без обработки всего топика.
Событие в outbox
R-KFK-EVT-2, R-KFK-EVT-4: событие — неизменяемая Go-структура в core/<bc>/event/, конструктор — единственная точка создания.
// core/order/event/order_confirmed.go
type OrderConfirmedEvent struct {
EventID string `json:"event_id"`
OccurredAt time.Time `json:"occurred_at"`
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
TotalRUB int64 `json:"total_rub"` // минорные единицы (копейки), не float64
EventType string `json:"event_type"`
}
func NewOrderConfirmedEvent(o Order) OrderConfirmedEvent {
return OrderConfirmedEvent{
EventID: uuid.Must(uuid.NewV7()).String(),
OccurredAt: time.Now().UTC(),
OrderID: o.ID,
CustomerID: o.CustomerID,
TotalRUB: o.TotalKopecks,
EventType: "OrderConfirmed",
}
}
mustMarshal в handler — обёртка json.Marshal с паникой при ошибке; ошибка сериализации собственного события означает баг в коде, не runtime-ошибку:
func mustMarshal(v any) []byte {
b, err := json.Marshal(v)
if err != nil {
panic(fmt.Sprintf("marshal %T: %v", v, err))
}
return b
}
Batch-публикация
R-KFK-OBX-4: relay читает 10–50 событий за тик.
Почему не по одному:
- DB-poll overhead — каждый запрос ~1–2ms даже с indexed scan.
- Kafka roundtrip —
writer.WriteMessagesсRequiredAcks: RequireAllждёт ack лидера и реплик, ~5–20ms. - При 100 events/s по одному — relay не успевает, latency растёт.
С batch 50 — relay поднимает 50 событий одним запросом, публикует последовательно с Key: []byte(partitionKey) (ordering per-partition сохранён), published_at ставит сразу после publish каждого. Throughput x10–20 относительно поодиночного.
При ошибке на середине batch (например, Kafka стала недоступна) — уже опубликованные будут помечены, остаток подберёт следующий тик. event_id UNIQUE в payload защитит consumer от дублей при повторной доставке.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
writer.WriteMessages из UseCase Handler с DB-операцией | R-KFK-OBX-X1 | запись OutboxEntry в той же pgx.Tx |
Публикация после tx.Commit без outbox | R-KFK-OBX-X2 | outbox-relay повторяет до успеха |
Outbox без published_at или без partial index | R-KFK-OBX-X3 | WHERE published_at IS NULL |
Relay без FOR UPDATE SKIP LOCKED | R-KFK-OBX-2 | несколько pod-ов без блокировок |
| Relay по одному событию | R-KFK-OBX-4 | batch 10–50 за тик |
Topic из строковых операций (strings.ToLower(eventType)) | R-KFK-OBX-3 | статический маппинг с panic на unknown |
Без event_id UNIQUE в outbox | R-KFK-OBX-1 | UNIQUE constraint защищает от двойной записи |
Relay вызывает WriteMessages из той же pgx.Tx что MarkPublished | R-KFK-OBX-X1 | publish → потом MarkPublished в отдельном запросе |
Куда дальше
- Producer — почему нельзя
WriteMessagesнапрямую из handler; настройкаkafka.Writer. - Idempotent consumer — receiver side at-least-once;
processed_event+pgx.Tx. - Event design — формат payload в outbox;
EventIDUUID v7; forward-compat. - Consumer —
kafka.ReaderсCommitInterval: 0; manual commit после обработки. - Конфигурация —
KafkaConfigчерезenvconfig; проверка топиков на старте. - Observability — метрики relay (
outbox_pending_events_total); tracing через OTel. - Retry topic + DLQ — retry-топики с задержкой вне poll-цикла; DLQ-alert.
- Security — TLS-dialer; per-service
ClientID; PII в restricted-топиках.