Опирается на правила: R-DIST-TX-X1R-DIST-TX-X3 и R-DIST-TX-1R-DIST-TX-3 из Distributed Patterns Style Guide → раздел 7. Distributed transactions — что НЕ делать.

Важно знать

  • В Go нет стандартного XA-менеджера: database/sql не поддерживает XA-протокол, pgx тоже.
  • Последовательный tx1.Commit(ctx) + tx2.Commit(ctx) через два pgxpool.Pool — это best-effort без recovery. Если второй commit упал — inconsistency без плана исправления.
  • Kafka — главный брокер — не реализует XA. Ни один современный вызов kafka-go WriteMessages не входит в XA-транзакцию.
  • pgx.Conn с ручным PREPARE TRANSACTION / COMMIT PREPARED (PostgreSQL двухфазный commit) — технически возможен, но без transaction coordinator и recovery-логики воспроизводит все проблемы 2PC без его инфраструктуры.
  • Альтернатива 1: Saga — каждый шаг коммитит свою pgx.Tx, при сбое orchestrator посылает compensation через outbox.
  • Альтернатива 2: Outbox + idempotent consumerINSERT бизнес-записи и INSERT в outbox в одной pgx.Tx; relay публикует в Kafka.
  • Альтернатива 3: Modular monolith — несколько BC в одном Go-бинаре с одним pgxpool.Pool; одна pgx.Tx — ACID без распределённой сложности.
  • «Мне нужна немедленная консистентность между двумя сервисами» — почти всегда неверная граница BC или реальная EC, которую надо задекларировать.

Почему 2PC / XA неприменимы в Go-стеке

database/sql и pgx не поддерживают XA

Java-мир знает javax.transaction.xa.XAResource, который реализуют JDBC-драйверы. В Go нет аналога: стандартная библиотека database/sql не содержит XA-интерфейса. Пакет pgx/v5 предоставляет только pgx.Tx с обычным BEGIN / COMMIT / ROLLBACK.

Теоретически PostgreSQL поддерживает двухфазный commit через PREPARE TRANSACTION '<xid>' и COMMIT PREPARED '<xid>'. Вызов этих SQL-команд через pgx возможен вручную:

// AVOID — PostgreSQL PREPARE TRANSACTION без coordinator и recovery
tx, _ := pool.Begin(ctx)
_, _ = tx.Exec(ctx, "PREPARE TRANSACTION 'order-saga-abc123'")
// ... сетевой вызов к payment-service ...
_, _ = pool.Exec(ctx, "COMMIT PREPARED 'order-saga-abc123'")

Проблема та же, что и в любом 2PC: если процесс упал после PREPARE TRANSACTION, но до COMMIT PREPARED — транзакция зависает в состоянии in_doubt в pg_prepared_xacts. Без отдельного coordinator-сервиса с персистентным журналом разрешить её автоматически невозможно. «Hanging prepared transactions» блокируют vacuum и autovacuum в PG.

Kafka не входит в XA

kafka-go Writer.WriteMessages не участвует ни в каком XA. Если бизнес-операция требует атомарного «запись в PG + публикация в Kafka» — XA здесь невозможен в принципе. Это фундаментальное ограничение, а не недостаток конкретной библиотеки.

Последовательный commit двух pgx.Tx — R-DIST-TX-X3

// AVOID — нет атомарности, нет recovery при сбое между commit'ами
tx1, _ := orderPool.Begin(ctx)
tx2, _ := paymentPool.Begin(ctx)
defer tx1.Rollback(ctx)
defer tx2.Rollback(ctx)

_ = qtxOrder.UpdateOrderStatus(ctx, ...)
_ = qtxPayment.DebitAccount(ctx, ...)

if err := tx1.Commit(ctx); err != nil {
    return err
}
// Process killed здесь → tx2 в paymentPool остался закоммиченным
if err := tx2.Commit(ctx); err != nil {
    return err
}

Если процесс упал или сеть пропала между двумя Commit — одна БД уже содержит изменения, вторая — нет. defer tx2.Rollback(ctx) уже не поможет: tx1 коммитнул. Inconsistency без recovery-плана. Это нарушение R-DIST-TX-X3.

Альтернативы

Saga с локальными транзакциями — R-DIST-TX-1

Каждый сервис коммитит только свою pgx.Tx. Orchestrator хранит состояние переходов в saga_<name> таблице и при сбое посылает compensation.

Полная схема для Order → Payment → Inventory:

// internal/saga/order_saga.go
func (o *OrderSagaOrchestrator) Start(ctx context.Context, tx pgx.Tx, cmd StartOrderSagaCommand) (uuid.UUID, error) {
    sagaID := uuid.New()
    qtx := o.queries.WithTx(tx)
    if err := qtx.InsertOrderSaga(ctx, db.InsertOrderSagaParams{
        SagaID:      sagaID,
        OrderID:     cmd.OrderID,
        Status:      "payment_pending",
        CurrentStep: "reserve_payment",
        Payload:     cmd.Payload,
    }); err != nil {
        return uuid.Nil, fmt.Errorf("insert order saga: %w", err)
    }
    return sagaID, o.outbox.Write(ctx, tx, OutboxMessage{
        SagaID:  sagaID,
        Topic:   "payment.commands",
        Payload: ReservePaymentCommand{SagaID: sagaID, OrderID: cmd.OrderID, Amount: cmd.Amount},
    })
}

func (o *OrderSagaOrchestrator) OnInventoryFailed(ctx context.Context, tx pgx.Tx, event InventoryFailedEvent) error {
    qtx := o.queries.WithTx(tx)
    if err := qtx.UpdateOrderSagaStatus(ctx, db.UpdateOrderSagaStatusParams{
        SagaID:      event.SagaID,
        Status:      "compensating",
        CurrentStep: "refund_payment",
    }); err != nil {
        return fmt.Errorf("update saga to compensating: %w", err)
    }
    return o.outbox.Write(ctx, tx, OutboxMessage{
        SagaID:  event.SagaID,
        Topic:   "payment.commands",
        Payload: RefundPaymentCommand{
            SagaID:          event.SagaID,
            OriginalOrderID: event.OrderID,
            Reason:          "inventory_unavailable",
        },
    })
}

Каждый pgx.Tx коммитит только в своей БД. Saga-state в таблице позволяет восстановить in-flight saga после рестарта — поллинг saga_order по статусу IN_PROGRESS при старте сервиса.

UseCase Handler запускает saga, не содержит переходов:

// core/order/handler/create_order_handler.go
func (h *CreateOrderHandler) Handle(ctx context.Context, cmd CreateOrderCommand) (*OrderCreatedResult, error) {
    tx, err := h.pool.Begin(ctx)
    if err != nil {
        return nil, fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    qtx := h.queries.WithTx(tx)
    orderID := uuid.New()
    if err := qtx.InsertOrder(ctx, db.InsertOrderParams{
        OrderID:    orderID,
        CustomerID: cmd.CustomerID,
        Status:     "pending",
        TotalSber:  cmd.TotalSber,
    }); err != nil {
        return nil, fmt.Errorf("insert order: %w", err)
    }

    sagaID, err := h.saga.Start(ctx, tx, StartOrderSagaCommand{
        OrderID: orderID,
        Amount:  cmd.TotalSber,
        Payload: cmd.Payload,
    })
    if err != nil {
        return nil, fmt.Errorf("start order saga: %w", err)
    }

    if err := tx.Commit(ctx); err != nil {
        return nil, fmt.Errorf("commit: %w", err)
    }
    return &OrderCreatedResult{OrderID: orderID, SagaID: sagaID}, nil
}

Outbox + idempotent consumer — R-DIST-TX-2

Для event-driven синхронизации без rollback'ов: INSERT бизнес-записи и INSERT в outbox_message в одной pgx.Tx. Relay читает outbox и публикует в Kafka. Receiver дедуплицирует по event_id.

// internal/outbox/writer.go
func (w *Writer) Write(ctx context.Context, tx pgx.Tx, msg Message) error {
    payload, err := json.Marshal(msg.Payload)
    if err != nil {
        return fmt.Errorf("marshal outbox payload: %w", err)
    }
    return w.queries.WithTx(tx).InsertOutboxMessage(ctx, db.InsertOutboxMessageParams{
        MessageID: uuid.New(),
        Topic:     msg.Topic,
        Key:       msg.Key,
        Payload:   payload,
        Headers:   encodeHeaders(msg.Headers),
        CreatedAt: time.Now(),
    })
}
// internal/outbox/relay.go
func (r *Relay) flush(ctx context.Context) error {
    msgs, err := r.queries.FetchPendingOutboxMessages(ctx, 100)
    if err != nil {
        return fmt.Errorf("fetch outbox: %w", err)
    }
    for _, m := range msgs {
        if err := r.producer.WriteMessages(ctx, kafka.Message{
            Topic:   m.Topic,
            Key:     []byte(m.Key),
            Value:   m.Payload,
            Headers: decodeHeaders(m.Headers),
        }); err != nil {
            return fmt.Errorf("write kafka: %w", err)
        }
        if err := r.queries.MarkOutboxMessageSent(ctx, m.MessageID); err != nil {
            return fmt.Errorf("mark sent: %w", err)
        }
    }
    return nil
}

Relay запускается в отдельной горутине через errgroup:

// cmd/server/main.go
g, gctx := errgroup.WithContext(ctx)

g.Go(func() error {
    return outboxRelay.Run(gctx)
})
g.Go(func() error {
    return httpServer.ListenAndServe()
})

if err := g.Wait(); err != nil {
    slog.ErrorContext(ctx, "service stopped", "error", err)
}

Outbox — единственный путь публикации в Kafka. Прямой вызов producer.WriteMessages из handler запрещён (R-DIST-OBX-X1).

Modular monolith — R-DIST-TX-3

Когда Order, Payment и Customer тесно связаны и независимая масштабируемость не нужна — один Go-бинар с одним pgxpool.Pool. Одна pgx.Tx покрывает все операции атомарно:

// core/order/handler/create_order_handler.go — modular monolith
func (h *CreateOrderHandler) Handle(ctx context.Context, cmd CreateOrderCommand) (*OrderCreatedResult, error) {
    tx, err := h.pool.Begin(ctx)
    if err != nil {
        return nil, fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    qtx := h.queries.WithTx(tx)

    orderID := uuid.New()
    if err := qtx.InsertOrder(ctx, db.InsertOrderParams{
        OrderID:    orderID,
        CustomerID: cmd.CustomerID,
        Status:     "confirmed",
    }); err != nil {
        return nil, fmt.Errorf("insert order: %w", err)
    }

    if err := qtx.DebitCustomerBalance(ctx, db.DebitCustomerBalanceParams{
        CustomerID: cmd.CustomerID,
        Amount:     cmd.TotalSber,
    }); err != nil {
        return nil, fmt.Errorf("debit balance: %w", err)
    }

    if err := qtx.ReserveProductStock(ctx, db.ReserveProductStockParams{
        ProductID: cmd.ProductID,
        Quantity:  cmd.Quantity,
    }); err != nil {
        return nil, fmt.Errorf("reserve stock: %w", err)
    }

    if err := tx.Commit(ctx); err != nil {
        return nil, fmt.Errorf("commit: %w", err)
    }
    return &OrderCreatedResult{OrderID: orderID}, nil
}

Все три операции — одна транзакция в одном PG. Никаких саг, никаких compensation, никакого outbox-relay. Если команда вырастет или потребуется независимый деплой payment — тогда вводим saga. До этого порога monolith проще и надёжнее.

«Мне нужна немедленная консистентность между сервисами»

Это требование почти всегда означает одно из трёх:

  1. Неверная граница BC — если две операции требуют немедленной консистентности, они скорее всего принадлежат одному Bounded Context. Объединить в один сервис с одним pgxpool.Pool.
  2. Реальная eventual consistency — на практике нужна быстрая (< 1 с) EC, а не строгая синхронность. Задекларировать в OpenAPI, измерить read_projection_lag_seconds как Prometheus-метрику.
  3. Read-your-writes — клиент должен увидеть свой результат сразу. Решается специальным endpoint-ом из write-side или передачей version-токена клиенту в ответе.

«Хочу 2PC» — почти всегда симптом, не требование.

Что запрещено

АнтипаттернПравилоЧто взамен
PREPARE TRANSACTION / COMMIT PREPARED вручную без coordinatorR-DIST-TX-X1saga с локальными pgx.Tx
Последовательный tx1.Commit + tx2.Commit разных БД без saga-recoveryR-DIST-TX-X3один pgx.Tx (monolith) или saga
Прямой producer.WriteMessages из handler внутри транзакцииR-DIST-OBX-X1outbox writer в той же pgx.Tx
pgx.Tx открытый через сетевой HTTP-вызов к другому сервисуR-DIST-TX-X1outbox + saga
Два pgxpool.Pool (два сервиса) + best-effort последовательностьR-DIST-TX-X3saga с compensation
Goroutine-after-commit для публикации в KafkaR-DIST-OBX-X2outbox relay

Куда дальше

  • Saga в идиомах Go — orchestrator struct, переходы через Advance, state в БД через sqlc.
  • Outbox + Inbox в идиомах Go — writer, relay через errgroup, inbox для critical-сценариев.
  • Eventual consistency в идиомах Go — version-поля, read_projection_lag_seconds, read-your-writes.
  • Idempotency в идиомах Go — middleware с (idempotency_key, command_hash), processed_event, TTL.
  • Compensation в идиомах Go — semantic refund, audit trail, DLQ при сбое compensation.
  • Когда нужны распределённые паттерны — modular monolith как альтернатива, критерии выбора.