Опирается на правила: R-KFK-OBX-1R-KFK-OBX-4 и R-KFK-OBX-X1R-KFK-OBX-X3 из Kafka Style Guide → раздел 3. Outbox publishing.

Важно знать

  • Domain events публикуются через outbox-relay, не напрямую writer.WriteMessages из UseCase Handler.
  • Запись в outbox идёт в той же pgx.Tx, что бизнес-write. Либо обе commit, либо обе rollback.
  • Outbox-relay — отдельная горутина (go relay.Run(ctx)), читает unpublished с FOR UPDATE SKIP LOCKED, публикует через kafka.Writer, проставляет published_at.
  • Topic name выводится из event_type / aggregate_type через статический маппинг, не строковыми операциями на лету.
  • Relay работает batch (10–50 events) — снижает overhead DB-poll и Kafka-roundtrip.
  • Partial index WHERE published_at IS NULL обязателен — без него full scan растущей таблицы на каждом тике.
  • writer.WriteMessages после tx.Commit без outbox — потеря события при падении между commit и publish.

Outbox publishing — фундаментальный паттерн UCP. Все domain events публикуются через него. Это даёт at-least-once доставку с атомарностью «commit DB + публикация» через локальную транзакцию в PostgreSQL, без XA с Kafka. Теория паттерна — Distributed → outbox + inbox.

Запись в outbox из handler

R-KFK-OBX-1: write-handler пишет в outbox в той же pgx.Tx через которую идёт бизнес-write. Kafka в этом слое не появляется вообще.

// core/order/handler/confirm_order_handler.go
type ConfirmOrderHandler struct {
    db     *pgxpool.Pool
    orders OrderRepository
    outbox OutboxRepository
}

func (h *ConfirmOrderHandler) Handle(ctx context.Context, cmd ConfirmOrderCommand) error {
    tx, err := h.db.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    order, err := h.orders.GetForUpdate(ctx, tx, cmd.OrderID)
    if err != nil {
        return err
    }
    order.Confirm()

    if err := h.orders.Save(ctx, tx, order); err != nil {
        return err
    }

    evt := NewOrderConfirmedEvent(order)
    if err := h.outbox.Store(ctx, tx, OutboxEntry{
        EventID:      evt.EventID,
        AggregateType: "Order",
        AggregateID:  order.ID,
        EventType:    "OrderConfirmed",
        Payload:      mustMarshal(evt),
        Topic:        topicFor("OrderConfirmed"),
        PartitionKey: order.ID,
    }); err != nil {
        return fmt.Errorf("outbox store: %w", err)
    }

    return tx.Commit(ctx)
}

Атомарность гарантирует PG: если tx.Commit не прошёл — записи нет ни в orders, ни в outbox. Никакого XA с Kafka не нужен.

OutboxEntry — плоская структура без бизнес-логики:

// infra/outbox/entry.go
type OutboxEntry struct {
    EventID      string
    AggregateType string
    AggregateID  string
    EventType    string
    Payload      []byte
    Topic        string
    PartitionKey string
}

Schema outbox-таблицы

R-KFK-OBX-X3: partial index по published_at IS NULL — обязателен.

CREATE TABLE outbox (
    id             bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    event_id       uuid        NOT NULL UNIQUE,
    aggregate_type text        NOT NULL,
    aggregate_id   text        NOT NULL,
    event_type     text        NOT NULL,
    payload        jsonb       NOT NULL,
    topic          text        NOT NULL,
    partition_key  text        NOT NULL,
    created_at     timestamptz NOT NULL DEFAULT now(),
    published_at   timestamptz
);

CREATE INDEX ix_outbox_unpublished
    ON outbox (id)
    WHERE published_at IS NULL;

WHERE published_at IS NULL — relay сканирует только непубликованные. Таблица растёт без ограничений, но «горячая» часть индекса минимальна. Без partial index relay делает full scan миллиардной таблицы на каждом тике.

event_id UNIQUE — защита от двойной записи одного события при повторном вызове handler (retried request до транзакции).

OutboxRepository — запись через pgx.Tx

// infra/outbox/repository.go
type OutboxRepository struct{}

func (r *OutboxRepository) Store(ctx context.Context, tx pgx.Tx, e OutboxEntry) error {
    _, err := tx.Exec(ctx, `
        INSERT INTO outbox
            (event_id, aggregate_type, aggregate_id, event_type, payload, topic, partition_key)
        VALUES ($1, $2, $3, $4, $5, $6, $7)
        ON CONFLICT (event_id) DO NOTHING
    `,
        e.EventID, e.AggregateType, e.AggregateID,
        e.EventType, e.Payload, e.Topic, e.PartitionKey,
    )
    if err != nil {
        return fmt.Errorf("outbox insert %s: %w", e.EventID, err)
    }
    return nil
}

func (r *OutboxRepository) FetchUnpublished(ctx context.Context, db *pgxpool.Pool, limit int) ([]outboxRow, error) {
    rows, err := db.Query(ctx, `
        SELECT id, event_id, topic, partition_key, payload
        FROM   outbox
        WHERE  published_at IS NULL
        ORDER  BY id
        LIMIT  $1
        FOR UPDATE SKIP LOCKED
    `, limit)
    if err != nil {
        return nil, fmt.Errorf("fetch unpublished: %w", err)
    }
    defer rows.Close()
    return pgx.CollectRows(rows, pgx.RowToStructByName[outboxRow])
}

func (r *OutboxRepository) MarkPublished(ctx context.Context, db *pgxpool.Pool, id int64) error {
    _, err := db.Exec(ctx,
        `UPDATE outbox SET published_at = now() WHERE id = $1`, id,
    )
    return err
}

FOR UPDATE SKIP LOCKED — несколько экземпляров relay (разные pod-ы) берут разные порции без блокировок. Горизонтальное масштабирование без координации.

ON CONFLICT (event_id) DO NOTHING в insert — идемпотентная запись. Повторный вызов handler не создаст дубля в outbox.

Outbox-relay

R-KFK-OBX-2: отдельная горутина, запускается в main рядом с HTTP-сервером.

// infra/kafka/outbox_relay.go
type OutboxRelay struct {
    db     *pgxpool.Pool
    repo   *OutboxRepository
    writer *kafka.Writer
    log    *slog.Logger
}

func (r *OutboxRelay) Run(ctx context.Context) {
    ticker := time.NewTicker(200 * time.Millisecond)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.log.ErrorContext(ctx, "outbox relay batch failed", "error", err)
            }
        }
    }
}

func (r *OutboxRelay) processBatch(ctx context.Context) error {
    events, err := r.repo.FetchUnpublished(ctx, r.db, 50) // R-KFK-OBX-4: batch 10–50
    if err != nil {
        return fmt.Errorf("fetch: %w", err)
    }
    for _, e := range events {
        msg := kafka.Message{
            Topic: e.Topic,
            Key:   []byte(e.PartitionKey),
            Value: e.Payload,
        }
        if err := r.writer.WriteMessages(ctx, msg); err != nil {
            return fmt.Errorf("publish %s %s: %w", e.EventType, e.EventID, err)
        }
        if err := r.repo.MarkPublished(ctx, r.db, e.ID); err != nil {
            return fmt.Errorf("mark published %s: %w", e.EventID, err)
        }
    }
    return nil
}

При ошибке publish relay возвращает ошибку и ждёт следующего тика. Строка в outbox остаётся с published_at IS NULL — relay повторит. Это и есть автоматический retry без дополнительной инфраструктуры.

Запуск из main:

// main.go
relay := &OutboxRelay{db: pool, repo: outboxRepo, writer: kafkaWriter, log: logger}
g.Go(func() error { relay.Run(gctx); return nil })

ggolang.org/x/sync/errgroup или аналог для управления жизненным циклом горутин.

Topic naming

R-KFK-OBX-3: топик выводится из event_type через статический маппинг, не строковыми операциями на лету.

// infra/kafka/topics.go
var topicByEventType = map[string]string{
    "OrderConfirmed":    "order-service.order.confirmed",
    "OrderCancelled":    "order-service.order.cancelled",
    "ProductReserved":   "order-service.product.reserved",
    "CustomerBlocked":   "customer-service.customer.blocked",
    "PaymentCharged":    "payment-service.payment.charged",
}

func topicFor(eventType string) string {
    t, ok := topicByEventType[eventType]
    if !ok {
        panic(fmt.Sprintf("unknown event type: %s", eventType))
    }
    return t
}

Convention — <service>.<aggregate-type>.<event-name>:

СервисTopic
order-serviceorder-service.order.confirmed
order-serviceorder-service.order.cancelled
order-serviceorder-service.product.reserved
customer-servicecustomer-service.customer.blocked
payment-servicepayment-service.payment.charged

Альтернатива — один topic на aggregate (order-service.order), consumer фильтрует по event_type в payload. Удобно когда один consumer хочет все события одного агрегата. Цена — нет возможности ack только один тип события без обработки всего топика.

Событие в outbox

R-KFK-EVT-2, R-KFK-EVT-4: событие — неизменяемая Go-структура в core/<bc>/event/, конструктор — единственная точка создания.

// core/order/event/order_confirmed.go
type OrderConfirmedEvent struct {
    EventID    string    `json:"event_id"`
    OccurredAt time.Time `json:"occurred_at"`
    OrderID    string    `json:"order_id"`
    CustomerID string    `json:"customer_id"`
    TotalRUB   int64     `json:"total_rub"`  // минорные единицы (копейки), не float64
    EventType  string    `json:"event_type"`
}

func NewOrderConfirmedEvent(o Order) OrderConfirmedEvent {
    return OrderConfirmedEvent{
        EventID:    uuid.Must(uuid.NewV7()).String(),
        OccurredAt: time.Now().UTC(),
        OrderID:    o.ID,
        CustomerID: o.CustomerID,
        TotalRUB:   o.TotalKopecks,
        EventType:  "OrderConfirmed",
    }
}

mustMarshal в handler — обёртка json.Marshal с паникой при ошибке; ошибка сериализации собственного события означает баг в коде, не runtime-ошибку:

func mustMarshal(v any) []byte {
    b, err := json.Marshal(v)
    if err != nil {
        panic(fmt.Sprintf("marshal %T: %v", v, err))
    }
    return b
}

Batch-публикация

R-KFK-OBX-4: relay читает 10–50 событий за тик.

Почему не по одному:

  • DB-poll overhead — каждый запрос ~1–2ms даже с indexed scan.
  • Kafka roundtripwriter.WriteMessages с RequiredAcks: RequireAll ждёт ack лидера и реплик, ~5–20ms.
  • При 100 events/s по одному — relay не успевает, latency растёт.

С batch 50 — relay поднимает 50 событий одним запросом, публикует последовательно с Key: []byte(partitionKey) (ordering per-partition сохранён), published_at ставит сразу после publish каждого. Throughput x10–20 относительно поодиночного.

При ошибке на середине batch (например, Kafka стала недоступна) — уже опубликованные будут помечены, остаток подберёт следующий тик. event_id UNIQUE в payload защитит consumer от дублей при повторной доставке.

Что запрещено

АнтипаттернПравилоЧто взамен
writer.WriteMessages из UseCase Handler с DB-операциейR-KFK-OBX-X1запись OutboxEntry в той же pgx.Tx
Публикация после tx.Commit без outboxR-KFK-OBX-X2outbox-relay повторяет до успеха
Outbox без published_at или без partial indexR-KFK-OBX-X3WHERE published_at IS NULL
Relay без FOR UPDATE SKIP LOCKEDR-KFK-OBX-2несколько pod-ов без блокировок
Relay по одному событиюR-KFK-OBX-4batch 10–50 за тик
Topic из строковых операций (strings.ToLower(eventType))R-KFK-OBX-3статический маппинг с panic на unknown
Без event_id UNIQUE в outboxR-KFK-OBX-1UNIQUE constraint защищает от двойной записи
Relay вызывает WriteMessages из той же pgx.Tx что MarkPublishedR-KFK-OBX-X1publish → потом MarkPublished в отдельном запросе

Куда дальше

  • Producer — почему нельзя WriteMessages напрямую из handler; настройка kafka.Writer.
  • Idempotent consumer — receiver side at-least-once; processed_event + pgx.Tx.
  • Event design — формат payload в outbox; EventID UUID v7; forward-compat.
  • Consumer — kafka.Reader с CommitInterval: 0; manual commit после обработки.
  • Конфигурация — KafkaConfig через envconfig; проверка топиков на старте.
  • Observability — метрики relay (outbox_pending_events_total); tracing через OTel.
  • Retry topic + DLQ — retry-топики с задержкой вне poll-цикла; DLQ-alert.
  • Security — TLS-dialer; per-service ClientID; PII в restricted-топиках.