Опирается на правила:
R-DIST-OBX-1…R-DIST-OBX-3иR-DIST-OBX-X1…R-DIST-OBX-X2из Distributed Patterns — раздел 5. Outbox + Inbox.
Важно знать
- Outbox решает «БД commit + message publish атомарно»:
INSERTвoutbox_messageи бизнес-запись в однойpgx.Tx— либо оба коммитятся, либо оба откатываются.- Go не имеет
@Transactional— транзакция явная:pgx.Txпробрасывается вoutbox.Writer.Write(ctx, tx, msg)черезqueries.WithTx(tx).- Relay — отдельная горутина в
errgroup, запускается при старте приложения; поллитoutbox_message WHERE published_at IS NULLсFOR UPDATE SKIP LOCKEDи пишет в Kafka черезkafka.Writer.- Inbox — обратная сторона: consumer пишет сообщение в
inbox_messageдо обработки; отдельный воркер обрабатывает unprocessed строки в своём темпе. Применяется только при высоком burst или критических финансовых потоках.- В большинстве случаев inbox избыточен — достаточно
processed_eventdedup-таблицы, проверяемой в той жеpgx.Tx.- Single source of truth —
pgxpoolсервиса. Kafka — транспорт; при потере retention —outbox_messageперепубликует.- Запрет direct publish:
kafka.Writer.WriteMessagesвнутри handler-транзакции — не атомарен сpgx.Tx; при rollback событие уже в Kafka.- Запрет goroutine-after-commit без outbox: ни один горутина не даёт гарантий при panick/crash между commit и send.
Outbox — единственный способ получить at-least-once гарантию доставки без двухфазного коммита между PG и Kafka.
Проблема «commit + publish»
Что хочется написать:
// core/order/create_order_handler.go
func (h *CreateOrderHandler) Handle(ctx context.Context, cmd CreateOrderCommand) (*Order, error) {
tx, _ := h.pool.Begin(ctx)
defer tx.Rollback(ctx)
order := NewOrder(cmd)
if err := h.queries.WithTx(tx).InsertOrder(ctx, toInsertParams(order)); err != nil {
return nil, fmt.Errorf("insert order: %w", err)
}
// ПЛОХО — нет атомарности: если tx.Commit упадёт — событие уже в Kafka
_ = h.producer.WriteMessages(ctx, kafka.Message{
Topic: "order.events",
Value: mustMarshal(OrderCreatedEvent{OrderID: order.ID}),
})
return order, tx.Commit(ctx)
}
Варианты сбоя:
tx.Commitпрошёл,WriteMessagesупал → событие потеряно, downstream не знает.WriteMessagesпрошёл,tx.Commitупал → событие в Kafka, заказа в БД нет → downstream обрабатывает несуществующий заказ.- Crash между
WriteMessagesиtx.Commit— оба варианта одновременно в разных инстансах.
Distributed transaction между PG и Kafka не существует: Kafka не поддерживает XA, в Go нет стандартного XA-менеджера (database/sql не знает XA). Outbox решает это через локальную транзакцию в PG.
Outbox writer
R-DIST-OBX-1: outbox для исходящих событий обязателен.
Схема
CREATE TABLE outbox_message (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
message_id uuid NOT NULL UNIQUE,
topic text NOT NULL,
key text NOT NULL,
payload jsonb NOT NULL,
headers jsonb NOT NULL DEFAULT '{}',
created_at timestamptz NOT NULL DEFAULT now(),
published_at timestamptz
);
CREATE INDEX ix_outbox_message_unpublished ON outbox_message(id)
WHERE published_at IS NULL;
Partial index WHERE published_at IS NULL — relay сканирует только непубликованные строки. После публикации запись остаётся для audit, но из «горячего» индекса исчезает.
Writer
// internal/outbox/writer.go
package outbox
type Message struct {
SagaID uuid.UUID
Topic string
Key string
Payload any
Headers map[string]string
}
type Writer struct {
queries *db.Queries
}
func NewWriter(queries *db.Queries) *Writer {
return &Writer{queries: queries}
}
func (w *Writer) Write(ctx context.Context, tx pgx.Tx, msg Message) error {
payload, err := json.Marshal(msg.Payload)
if err != nil {
return fmt.Errorf("marshal outbox payload: %w", err)
}
headers, err := json.Marshal(msg.Headers)
if err != nil {
return fmt.Errorf("marshal outbox headers: %w", err)
}
return w.queries.WithTx(tx).InsertOutboxMessage(ctx, db.InsertOutboxMessageParams{
MessageID: uuid.New(),
Topic: msg.Topic,
Key: msg.Key,
Payload: payload,
Headers: headers,
CreatedAt: time.Now(),
})
}
queries.WithTx(tx) гарантирует, что INSERT INTO outbox_message выполняется в той же транзакции, что и бизнес-запись. Rollback откатывает оба.
Handler — правильно
// core/order/create_order_handler.go
type CreateOrderHandler struct {
queries *db.Queries
pool *pgxpool.Pool
outbox *outbox.Writer
}
func (h *CreateOrderHandler) Handle(ctx context.Context, cmd CreateOrderCommand) (*Order, error) {
tx, err := h.pool.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
order := NewOrder(cmd)
qtx := h.queries.WithTx(tx)
if err := qtx.InsertOrder(ctx, db.InsertOrderParams{
OrderID: order.ID,
CustomerID: order.CustomerID,
Amount: order.Amount,
Status: "pending",
}); err != nil {
return nil, fmt.Errorf("insert order: %w", err)
}
if err := h.outbox.Write(ctx, tx, outbox.Message{
Topic: "order.events",
Key: order.ID.String(),
Payload: OrderCreatedEvent{OrderID: order.ID, CustomerID: order.CustomerID, Amount: order.Amount},
Headers: map[string]string{"event_type": "OrderCreated", "event_id": uuid.NewString()},
}); err != nil {
return nil, fmt.Errorf("write outbox: %w", err)
}
return order, tx.Commit(ctx)
}
Либо InsertOrder и InsertOutboxMessage оба commit, либо оба rollback. Никакого race condition.
Outbox relay
Relay — отдельный компонент, не inline в handler. Запускается в errgroup рядом с chi-сервером.
Relay struct
// internal/outbox/relay.go
package outbox
type Relay struct {
queries *db.Queries
pool *pgxpool.Pool
producer *kafka.Writer
}
func NewRelay(queries *db.Queries, pool *pgxpool.Pool, producer *kafka.Writer) *Relay {
return &Relay{queries: queries, pool: pool, producer: producer}
}
func (r *Relay) Run(ctx context.Context) error {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := r.flush(ctx); err != nil {
slog.ErrorContext(ctx, "outbox flush failed", "error", err)
}
}
}
}
func (r *Relay) flush(ctx context.Context) error {
tx, err := r.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("begin relay tx: %w", err)
}
defer tx.Rollback(ctx)
msgs, err := r.queries.WithTx(tx).FetchPendingOutboxMessages(ctx, 100)
if err != nil {
return fmt.Errorf("fetch outbox: %w", err)
}
if len(msgs) == 0 {
return tx.Rollback(ctx)
}
kafkaMsgs := make([]kafka.Message, 0, len(msgs))
for _, m := range msgs {
kafkaMsgs = append(kafkaMsgs, kafka.Message{
Topic: m.Topic,
Key: []byte(m.Key),
Value: m.Payload,
Headers: decodeHeaders(m.Headers),
})
}
if err := r.producer.WriteMessages(ctx, kafkaMsgs...); err != nil {
return fmt.Errorf("write kafka: %w", err)
}
ids := make([]pgtype.UUID, len(msgs))
for i, m := range msgs {
ids[i] = m.MessageID
}
if err := r.queries.WithTx(tx).MarkOutboxMessagesSent(ctx, ids); err != nil {
return fmt.Errorf("mark sent: %w", err)
}
return tx.Commit(ctx)
}
FOR UPDATE SKIP LOCKED
SQL-запрос relay-я обязательно использует FOR UPDATE SKIP LOCKED:
-- db/query/outbox.sql
-- name: FetchPendingOutboxMessages :many
SELECT *
FROM outbox_message
WHERE published_at IS NULL
ORDER BY id
LIMIT $1
FOR UPDATE SKIP LOCKED;
SKIP LOCKED позволяет нескольким инстансам relay-я работать параллельно без блокировки друг друга: каждый берёт свою порцию строк. При рестарте relay после сбоя — unpublished строки снова доступны. At-least-once при crash между WriteMessages и MarkOutboxMessagesSent — receiver обязан быть идемпотентным.
Запуск в errgroup
// cmd/server/main.go
func run(ctx context.Context, cfg *Config) error {
pool, _ := pgxpool.New(ctx, cfg.DatabaseURL)
queries := db.New(pool)
producer := &kafka.Writer{Addr: kafka.TCP(cfg.KafkaBrokers...), Balancer: &kafka.Hash{}}
outboxWriter := outbox.NewWriter(queries)
outboxRelay := outbox.NewRelay(queries, pool, producer)
router := chi.NewRouter()
registerRoutes(router, queries, pool, outboxWriter)
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
return http.ListenAndServe(cfg.Addr, router)
})
g.Go(func() error {
return outboxRelay.Run(gctx)
})
return g.Wait()
}
Single source of truth
R-DIST-OBX-3: БД сервиса — единственный источник правды; Kafka — транспорт.
Что это даёт:
- Потеря Kafka-данных (retention истёк, broker сломался) —
outbox_messageпродолжает накапливать; после восстановления broker relay публикует пропущенное. - Rebuild read-projection — скрипт перечитывает
outbox_messageи публикует события повторно; downstream consumer обрабатывает идемпотентно. - Полный audit — каждое событие, которое сервис когда-либо породил, остаётся в таблице.
Сравнить с «Kafka как source of truth»: требует Kafka Connect / KStreams, retention становится критичным, потеря брокера — потеря истории. UCP выбирает PG как SoT.
Inbox pattern
R-DIST-OBX-2: inbox — обратная сторона outbox. Consumer пишет полученное сообщение в inbox_message до обработки; отдельный воркер обрабатывает unprocessed-строки в своём темпе.
Схема inbox
CREATE TABLE inbox_message (
event_id uuid PRIMARY KEY,
received_at timestamptz NOT NULL DEFAULT now(),
topic text NOT NULL,
payload jsonb NOT NULL,
processed boolean NOT NULL DEFAULT false,
processed_at timestamptz
);
CREATE INDEX ix_inbox_message_unprocessed ON inbox_message(received_at)
WHERE NOT processed;
Consumer + воркер
// adapters/in/kafka/payment_consumer.go
type PaymentConsumer struct {
queries *db.Queries
pool *pgxpool.Pool
reader *kafka.Reader
worker *InboxWorker
}
func (c *PaymentConsumer) Run(ctx context.Context) error {
for {
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
return fmt.Errorf("fetch message: %w", err)
}
eventID := headerValue(msg.Headers, "event_id")
if eventID == "" {
_ = c.reader.CommitMessages(ctx, msg)
continue
}
tx, err := c.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("begin inbox tx: %w", err)
}
err = c.queries.WithTx(tx).InsertInboxMessage(ctx, db.InsertInboxMessageParams{
EventID: mustParseUUID(eventID),
Topic: msg.Topic,
Payload: msg.Value,
})
if err != nil && !isDuplicateKey(err) {
tx.Rollback(ctx)
return fmt.Errorf("insert inbox: %w", err)
}
if commitErr := tx.Commit(ctx); commitErr != nil {
return fmt.Errorf("commit inbox: %w", commitErr)
}
_ = c.reader.CommitMessages(ctx, msg)
}
}
// adapters/in/kafka/inbox_worker.go
type InboxWorker struct {
queries *db.Queries
pool *pgxpool.Pool
handler PaymentEventHandler
}
func (w *InboxWorker) Run(ctx context.Context) error {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := w.processNext(ctx); err != nil {
slog.ErrorContext(ctx, "inbox worker error", "error", err)
}
}
}
}
func (w *InboxWorker) processNext(ctx context.Context) error {
tx, err := w.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("begin worker tx: %w", err)
}
defer tx.Rollback(ctx)
rows, err := w.queries.WithTx(tx).FetchUnprocessedInboxMessages(ctx, 50)
if err != nil {
return fmt.Errorf("fetch inbox: %w", err)
}
for _, row := range rows {
var event PaymentEvent
if err := json.Unmarshal(row.Payload, &event); err != nil {
slog.ErrorContext(ctx, "unmarshal inbox payload", "event_id", row.EventID, "error", err)
continue
}
if err := w.handler.Handle(ctx, tx, event); err != nil {
return fmt.Errorf("handle inbox event %s: %w", row.EventID, err)
}
if err := w.queries.WithTx(tx).MarkInboxMessageProcessed(ctx, row.EventID); err != nil {
return fmt.Errorf("mark processed: %w", err)
}
}
return tx.Commit(ctx)
}
Когда использовать inbox
- Bursty traffic — Kafka даёт burst 10 000 msg/s, обработка тяжёлая; inbox развязывает приём и обработку: consumer принимает быстро, воркер работает в своём темпе.
- Критические финансовые потоки — нужно гарантировать, что ни одно сообщение не потеряется между приёмом и обработкой даже при panick.
В большинстве случаев inbox избыточен. Если обработка лёгкая и DLQ достаточен — inbox добавляет сложность без пользы.
| Критерий | Только processed_event | Inbox + воркер |
|---|---|---|
| Сложность | низкая | средняя |
| Burst handling | ограничен concurrency consumer-а | развязка приёма и обработки |
| Recovery после crash | re-consume из Kafka | отдельная обработка inbox-строк |
| Когда применять | дефолт | финансовые потоки / высокий burst |
Dedup через processed_event — дефолтный путь
R-DIST-IDEM-2: для большинства consumer-ов достаточно processed_event dedup-таблицы, проверяемой в той же транзакции — без inbox.
// adapters/in/kafka/consumer.go
func (c *Consumer) handleMessage(ctx context.Context, msg kafka.Message) error {
eventID := headerValue(msg.Headers, "event_id")
tx, err := c.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
qtx := c.queries.WithTx(tx)
exists, err := qtx.ExistsProcessedEvent(ctx, eventID)
if err != nil {
return fmt.Errorf("check processed event: %w", err)
}
if exists {
return nil
}
if err := c.processEvent(ctx, qtx, msg); err != nil {
return err
}
if err := qtx.InsertProcessedEvent(ctx, db.InsertProcessedEventParams{
EventID: eventID,
Topic: msg.Topic,
ExpiresAt: time.Now().Add(72 * time.Hour),
}); err != nil {
return fmt.Errorf("insert processed event: %w", err)
}
return tx.Commit(ctx)
}
Проверка и запись в одной транзакции — нет TOCTOU; при повторной доставке того же event_id handler вернёт nil идемпотентно.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
producer.WriteMessages внутри pgx.Tx handler-а | R-DIST-OBX-X1 | outbox.Writer.Write(ctx, tx, msg) в той же транзакции |
| Горутина-after-commit без outbox | R-DIST-OBX-X2 | outbox relay с FOR UPDATE SKIP LOCKED |
Relay без FOR UPDATE SKIP LOCKED | R-DIST-OBX-1 | SKIP LOCKED для параллельных relay-инстансов |
Outbox без partial-index WHERE published_at IS NULL | R-DIST-OBX-1 | partial index на partial scan |
| Kafka как source of truth | R-DIST-OBX-3 | PG — SoT; Kafka — транспорт |
| Inbox для каждого consumer-а по умолчанию | R-DIST-OBX-2 | processed_event dedup дефолт; inbox — только critical |
| DELETE published rows из outbox | R-DIST-OBX-3 | hold для audit/rebuild; cleanup отдельным job-ом по retention |
Relay через time.Sleep в горутине | R-DIST-OBX-1 | time.NewTicker + errgroup с graceful shutdown |
Куда дальше
- Idempotency — receiver обязан быть идемпотентным при at-least-once;
processed_eventdedup-таблица. - Saga — оркестрация vs хореография — saga-state-таблица и outbox работают вместе;
outbox.Writeв каждом переходе. - Eventual consistency — outbox — главный механизм EC; лаг проекции как
read_projection_lag_seconds. - Compensation — compensation-команды публикуются через outbox в той же
pgx.Tx. - Distributed transactions — что не делать — почему
tx1.Commit+tx2.Commitне замена outbox. - Когда нужны распределённые паттерны — outbox актуален только при cross-service операции.