Опирается на правила: R-DIST-OBX-1R-DIST-OBX-3 и R-DIST-OBX-X1R-DIST-OBX-X2 из Distributed Patterns — раздел 5. Outbox + Inbox.

Важно знать

  • Outbox решает «БД commit + message publish атомарно»: INSERT в outbox_message и бизнес-запись в одной pgx.Tx — либо оба коммитятся, либо оба откатываются.
  • Go не имеет @Transactional — транзакция явная: pgx.Tx пробрасывается в outbox.Writer.Write(ctx, tx, msg) через queries.WithTx(tx).
  • Relay — отдельная горутина в errgroup, запускается при старте приложения; поллит outbox_message WHERE published_at IS NULL с FOR UPDATE SKIP LOCKED и пишет в Kafka через kafka.Writer.
  • Inbox — обратная сторона: consumer пишет сообщение в inbox_message до обработки; отдельный воркер обрабатывает unprocessed строки в своём темпе. Применяется только при высоком burst или критических финансовых потоках.
  • В большинстве случаев inbox избыточен — достаточно processed_event dedup-таблицы, проверяемой в той же pgx.Tx.
  • Single source of truthpgxpool сервиса. Kafka — транспорт; при потере retention — outbox_message перепубликует.
  • Запрет direct publish: kafka.Writer.WriteMessages внутри handler-транзакции — не атомарен с pgx.Tx; при rollback событие уже в Kafka.
  • Запрет goroutine-after-commit без outbox: ни один горутина не даёт гарантий при panick/crash между commit и send.

Outbox — единственный способ получить at-least-once гарантию доставки без двухфазного коммита между PG и Kafka.

Проблема «commit + publish»

Что хочется написать:

// core/order/create_order_handler.go

func (h *CreateOrderHandler) Handle(ctx context.Context, cmd CreateOrderCommand) (*Order, error) {
    tx, _ := h.pool.Begin(ctx)
    defer tx.Rollback(ctx)

    order := NewOrder(cmd)
    if err := h.queries.WithTx(tx).InsertOrder(ctx, toInsertParams(order)); err != nil {
        return nil, fmt.Errorf("insert order: %w", err)
    }

    // ПЛОХО — нет атомарности: если tx.Commit упадёт — событие уже в Kafka
    _ = h.producer.WriteMessages(ctx, kafka.Message{
        Topic: "order.events",
        Value: mustMarshal(OrderCreatedEvent{OrderID: order.ID}),
    })

    return order, tx.Commit(ctx)
}

Варианты сбоя:

  • tx.Commit прошёл, WriteMessages упал → событие потеряно, downstream не знает.
  • WriteMessages прошёл, tx.Commit упал → событие в Kafka, заказа в БД нет → downstream обрабатывает несуществующий заказ.
  • Crash между WriteMessages и tx.Commit — оба варианта одновременно в разных инстансах.

Distributed transaction между PG и Kafka не существует: Kafka не поддерживает XA, в Go нет стандартного XA-менеджера (database/sql не знает XA). Outbox решает это через локальную транзакцию в PG.

Outbox writer

R-DIST-OBX-1: outbox для исходящих событий обязателен.

Схема

CREATE TABLE outbox_message (
    id             bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    message_id     uuid        NOT NULL UNIQUE,
    topic          text        NOT NULL,
    key            text        NOT NULL,
    payload        jsonb       NOT NULL,
    headers        jsonb       NOT NULL DEFAULT '{}',
    created_at     timestamptz NOT NULL DEFAULT now(),
    published_at   timestamptz
);

CREATE INDEX ix_outbox_message_unpublished ON outbox_message(id)
    WHERE published_at IS NULL;

Partial index WHERE published_at IS NULL — relay сканирует только непубликованные строки. После публикации запись остаётся для audit, но из «горячего» индекса исчезает.

Writer

// internal/outbox/writer.go
package outbox

type Message struct {
    SagaID  uuid.UUID
    Topic   string
    Key     string
    Payload any
    Headers map[string]string
}

type Writer struct {
    queries *db.Queries
}

func NewWriter(queries *db.Queries) *Writer {
    return &Writer{queries: queries}
}

func (w *Writer) Write(ctx context.Context, tx pgx.Tx, msg Message) error {
    payload, err := json.Marshal(msg.Payload)
    if err != nil {
        return fmt.Errorf("marshal outbox payload: %w", err)
    }
    headers, err := json.Marshal(msg.Headers)
    if err != nil {
        return fmt.Errorf("marshal outbox headers: %w", err)
    }
    return w.queries.WithTx(tx).InsertOutboxMessage(ctx, db.InsertOutboxMessageParams{
        MessageID: uuid.New(),
        Topic:     msg.Topic,
        Key:       msg.Key,
        Payload:   payload,
        Headers:   headers,
        CreatedAt: time.Now(),
    })
}

queries.WithTx(tx) гарантирует, что INSERT INTO outbox_message выполняется в той же транзакции, что и бизнес-запись. Rollback откатывает оба.

Handler — правильно

// core/order/create_order_handler.go

type CreateOrderHandler struct {
    queries *db.Queries
    pool    *pgxpool.Pool
    outbox  *outbox.Writer
}

func (h *CreateOrderHandler) Handle(ctx context.Context, cmd CreateOrderCommand) (*Order, error) {
    tx, err := h.pool.Begin(ctx)
    if err != nil {
        return nil, fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    order := NewOrder(cmd)
    qtx := h.queries.WithTx(tx)

    if err := qtx.InsertOrder(ctx, db.InsertOrderParams{
        OrderID:    order.ID,
        CustomerID: order.CustomerID,
        Amount:     order.Amount,
        Status:     "pending",
    }); err != nil {
        return nil, fmt.Errorf("insert order: %w", err)
    }

    if err := h.outbox.Write(ctx, tx, outbox.Message{
        Topic:   "order.events",
        Key:     order.ID.String(),
        Payload: OrderCreatedEvent{OrderID: order.ID, CustomerID: order.CustomerID, Amount: order.Amount},
        Headers: map[string]string{"event_type": "OrderCreated", "event_id": uuid.NewString()},
    }); err != nil {
        return nil, fmt.Errorf("write outbox: %w", err)
    }

    return order, tx.Commit(ctx)
}

Либо InsertOrder и InsertOutboxMessage оба commit, либо оба rollback. Никакого race condition.

Outbox relay

Relay — отдельный компонент, не inline в handler. Запускается в errgroup рядом с chi-сервером.

Relay struct

// internal/outbox/relay.go
package outbox

type Relay struct {
    queries  *db.Queries
    pool     *pgxpool.Pool
    producer *kafka.Writer
}

func NewRelay(queries *db.Queries, pool *pgxpool.Pool, producer *kafka.Writer) *Relay {
    return &Relay{queries: queries, pool: pool, producer: producer}
}

func (r *Relay) Run(ctx context.Context) error {
    ticker := time.NewTicker(500 * time.Millisecond)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            if err := r.flush(ctx); err != nil {
                slog.ErrorContext(ctx, "outbox flush failed", "error", err)
            }
        }
    }
}

func (r *Relay) flush(ctx context.Context) error {
    tx, err := r.pool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin relay tx: %w", err)
    }
    defer tx.Rollback(ctx)

    msgs, err := r.queries.WithTx(tx).FetchPendingOutboxMessages(ctx, 100)
    if err != nil {
        return fmt.Errorf("fetch outbox: %w", err)
    }
    if len(msgs) == 0 {
        return tx.Rollback(ctx)
    }

    kafkaMsgs := make([]kafka.Message, 0, len(msgs))
    for _, m := range msgs {
        kafkaMsgs = append(kafkaMsgs, kafka.Message{
            Topic:   m.Topic,
            Key:     []byte(m.Key),
            Value:   m.Payload,
            Headers: decodeHeaders(m.Headers),
        })
    }

    if err := r.producer.WriteMessages(ctx, kafkaMsgs...); err != nil {
        return fmt.Errorf("write kafka: %w", err)
    }

    ids := make([]pgtype.UUID, len(msgs))
    for i, m := range msgs {
        ids[i] = m.MessageID
    }
    if err := r.queries.WithTx(tx).MarkOutboxMessagesSent(ctx, ids); err != nil {
        return fmt.Errorf("mark sent: %w", err)
    }

    return tx.Commit(ctx)
}

FOR UPDATE SKIP LOCKED

SQL-запрос relay-я обязательно использует FOR UPDATE SKIP LOCKED:

-- db/query/outbox.sql
-- name: FetchPendingOutboxMessages :many
SELECT *
FROM outbox_message
WHERE published_at IS NULL
ORDER BY id
LIMIT $1
FOR UPDATE SKIP LOCKED;

SKIP LOCKED позволяет нескольким инстансам relay-я работать параллельно без блокировки друг друга: каждый берёт свою порцию строк. При рестарте relay после сбоя — unpublished строки снова доступны. At-least-once при crash между WriteMessages и MarkOutboxMessagesSent — receiver обязан быть идемпотентным.

Запуск в errgroup

// cmd/server/main.go

func run(ctx context.Context, cfg *Config) error {
    pool, _ := pgxpool.New(ctx, cfg.DatabaseURL)
    queries := db.New(pool)
    producer := &kafka.Writer{Addr: kafka.TCP(cfg.KafkaBrokers...), Balancer: &kafka.Hash{}}

    outboxWriter := outbox.NewWriter(queries)
    outboxRelay := outbox.NewRelay(queries, pool, producer)

    router := chi.NewRouter()
    registerRoutes(router, queries, pool, outboxWriter)

    g, gctx := errgroup.WithContext(ctx)

    g.Go(func() error {
        return http.ListenAndServe(cfg.Addr, router)
    })

    g.Go(func() error {
        return outboxRelay.Run(gctx)
    })

    return g.Wait()
}

Single source of truth

R-DIST-OBX-3: БД сервиса — единственный источник правды; Kafka — транспорт.

Что это даёт:

  • Потеря Kafka-данных (retention истёк, broker сломался) — outbox_message продолжает накапливать; после восстановления broker relay публикует пропущенное.
  • Rebuild read-projection — скрипт перечитывает outbox_message и публикует события повторно; downstream consumer обрабатывает идемпотентно.
  • Полный audit — каждое событие, которое сервис когда-либо породил, остаётся в таблице.

Сравнить с «Kafka как source of truth»: требует Kafka Connect / KStreams, retention становится критичным, потеря брокера — потеря истории. UCP выбирает PG как SoT.

Inbox pattern

R-DIST-OBX-2: inbox — обратная сторона outbox. Consumer пишет полученное сообщение в inbox_message до обработки; отдельный воркер обрабатывает unprocessed-строки в своём темпе.

Схема inbox

CREATE TABLE inbox_message (
    event_id       uuid        PRIMARY KEY,
    received_at    timestamptz NOT NULL DEFAULT now(),
    topic          text        NOT NULL,
    payload        jsonb       NOT NULL,
    processed      boolean     NOT NULL DEFAULT false,
    processed_at   timestamptz
);

CREATE INDEX ix_inbox_message_unprocessed ON inbox_message(received_at)
    WHERE NOT processed;

Consumer + воркер

// adapters/in/kafka/payment_consumer.go

type PaymentConsumer struct {
    queries *db.Queries
    pool    *pgxpool.Pool
    reader  *kafka.Reader
    worker  *InboxWorker
}

func (c *PaymentConsumer) Run(ctx context.Context) error {
    for {
        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            return fmt.Errorf("fetch message: %w", err)
        }

        eventID := headerValue(msg.Headers, "event_id")
        if eventID == "" {
            _ = c.reader.CommitMessages(ctx, msg)
            continue
        }

        tx, err := c.pool.Begin(ctx)
        if err != nil {
            return fmt.Errorf("begin inbox tx: %w", err)
        }

        err = c.queries.WithTx(tx).InsertInboxMessage(ctx, db.InsertInboxMessageParams{
            EventID: mustParseUUID(eventID),
            Topic:   msg.Topic,
            Payload: msg.Value,
        })
        if err != nil && !isDuplicateKey(err) {
            tx.Rollback(ctx)
            return fmt.Errorf("insert inbox: %w", err)
        }

        if commitErr := tx.Commit(ctx); commitErr != nil {
            return fmt.Errorf("commit inbox: %w", commitErr)
        }

        _ = c.reader.CommitMessages(ctx, msg)
    }
}
// adapters/in/kafka/inbox_worker.go

type InboxWorker struct {
    queries *db.Queries
    pool    *pgxpool.Pool
    handler PaymentEventHandler
}

func (w *InboxWorker) Run(ctx context.Context) error {
    ticker := time.NewTicker(200 * time.Millisecond)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            if err := w.processNext(ctx); err != nil {
                slog.ErrorContext(ctx, "inbox worker error", "error", err)
            }
        }
    }
}

func (w *InboxWorker) processNext(ctx context.Context) error {
    tx, err := w.pool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin worker tx: %w", err)
    }
    defer tx.Rollback(ctx)

    rows, err := w.queries.WithTx(tx).FetchUnprocessedInboxMessages(ctx, 50)
    if err != nil {
        return fmt.Errorf("fetch inbox: %w", err)
    }

    for _, row := range rows {
        var event PaymentEvent
        if err := json.Unmarshal(row.Payload, &event); err != nil {
            slog.ErrorContext(ctx, "unmarshal inbox payload", "event_id", row.EventID, "error", err)
            continue
        }

        if err := w.handler.Handle(ctx, tx, event); err != nil {
            return fmt.Errorf("handle inbox event %s: %w", row.EventID, err)
        }

        if err := w.queries.WithTx(tx).MarkInboxMessageProcessed(ctx, row.EventID); err != nil {
            return fmt.Errorf("mark processed: %w", err)
        }
    }

    return tx.Commit(ctx)
}

Когда использовать inbox

  • Bursty traffic — Kafka даёт burst 10 000 msg/s, обработка тяжёлая; inbox развязывает приём и обработку: consumer принимает быстро, воркер работает в своём темпе.
  • Критические финансовые потоки — нужно гарантировать, что ни одно сообщение не потеряется между приёмом и обработкой даже при panick.

В большинстве случаев inbox избыточен. Если обработка лёгкая и DLQ достаточен — inbox добавляет сложность без пользы.

КритерийТолько processed_eventInbox + воркер
Сложностьнизкаясредняя
Burst handlingограничен concurrency consumer-аразвязка приёма и обработки
Recovery после crashre-consume из Kafkaотдельная обработка inbox-строк
Когда применятьдефолтфинансовые потоки / высокий burst

Dedup через processed_event — дефолтный путь

R-DIST-IDEM-2: для большинства consumer-ов достаточно processed_event dedup-таблицы, проверяемой в той же транзакции — без inbox.

// adapters/in/kafka/consumer.go

func (c *Consumer) handleMessage(ctx context.Context, msg kafka.Message) error {
    eventID := headerValue(msg.Headers, "event_id")

    tx, err := c.pool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    qtx := c.queries.WithTx(tx)

    exists, err := qtx.ExistsProcessedEvent(ctx, eventID)
    if err != nil {
        return fmt.Errorf("check processed event: %w", err)
    }
    if exists {
        return nil
    }

    if err := c.processEvent(ctx, qtx, msg); err != nil {
        return err
    }

    if err := qtx.InsertProcessedEvent(ctx, db.InsertProcessedEventParams{
        EventID:   eventID,
        Topic:     msg.Topic,
        ExpiresAt: time.Now().Add(72 * time.Hour),
    }); err != nil {
        return fmt.Errorf("insert processed event: %w", err)
    }

    return tx.Commit(ctx)
}

Проверка и запись в одной транзакции — нет TOCTOU; при повторной доставке того же event_id handler вернёт nil идемпотентно.

Что запрещено

АнтипаттернПравилоЧто взамен
producer.WriteMessages внутри pgx.Tx handler-аR-DIST-OBX-X1outbox.Writer.Write(ctx, tx, msg) в той же транзакции
Горутина-after-commit без outboxR-DIST-OBX-X2outbox relay с FOR UPDATE SKIP LOCKED
Relay без FOR UPDATE SKIP LOCKEDR-DIST-OBX-1SKIP LOCKED для параллельных relay-инстансов
Outbox без partial-index WHERE published_at IS NULLR-DIST-OBX-1partial index на partial scan
Kafka как source of truthR-DIST-OBX-3PG — SoT; Kafka — транспорт
Inbox для каждого consumer-а по умолчаниюR-DIST-OBX-2processed_event dedup дефолт; inbox — только critical
DELETE published rows из outboxR-DIST-OBX-3hold для audit/rebuild; cleanup отдельным job-ом по retention
Relay через time.Sleep в горутинеR-DIST-OBX-1time.NewTicker + errgroup с graceful shutdown

Куда дальше

  • Idempotency — receiver обязан быть идемпотентным при at-least-once; processed_event dedup-таблица.
  • Saga — оркестрация vs хореография — saga-state-таблица и outbox работают вместе; outbox.Write в каждом переходе.
  • Eventual consistency — outbox — главный механизм EC; лаг проекции как read_projection_lag_seconds.
  • Compensation — compensation-команды публикуются через outbox в той же pgx.Tx.
  • Distributed transactions — что не делать — почему tx1.Commit + tx2.Commit не замена outbox.
  • Когда нужны распределённые паттерны — outbox актуален только при cross-service операции.