Опирается на правила:
R-DIST-TX-X1…R-DIST-TX-X3иR-DIST-TX-1…R-DIST-TX-3из Distributed Patterns Style Guide → раздел 7. Distributed transactions — что НЕ делать.
Важно знать
- В Go нет стандартного XA-менеджера:
database/sqlне поддерживает XA-протокол,pgxтоже.- Последовательный
tx1.Commit(ctx)+tx2.Commit(ctx)через дваpgxpool.Pool— это best-effort без recovery. Если второй commit упал — inconsistency без плана исправления.- Kafka — главный брокер — не реализует XA. Ни один современный вызов
kafka-go WriteMessagesне входит в XA-транзакцию.pgx.Connс ручнымPREPARE TRANSACTION/COMMIT PREPARED(PostgreSQL двухфазный commit) — технически возможен, но без transaction coordinator и recovery-логики воспроизводит все проблемы 2PC без его инфраструктуры.- Альтернатива 1: Saga — каждый шаг коммитит свою
pgx.Tx, при сбое orchestrator посылает compensation через outbox.- Альтернатива 2: Outbox + idempotent consumer —
INSERTбизнес-записи иINSERTв outbox в однойpgx.Tx; relay публикует в Kafka.- Альтернатива 3: Modular monolith — несколько BC в одном Go-бинаре с одним
pgxpool.Pool; однаpgx.Tx— ACID без распределённой сложности.- «Мне нужна немедленная консистентность между двумя сервисами» — почти всегда неверная граница BC или реальная EC, которую надо задекларировать.
Почему 2PC / XA неприменимы в Go-стеке
database/sql и pgx не поддерживают XA
Java-мир знает javax.transaction.xa.XAResource, который реализуют JDBC-драйверы. В Go нет аналога: стандартная библиотека database/sql не содержит XA-интерфейса. Пакет pgx/v5 предоставляет только pgx.Tx с обычным BEGIN / COMMIT / ROLLBACK.
Теоретически PostgreSQL поддерживает двухфазный commit через PREPARE TRANSACTION '<xid>' и COMMIT PREPARED '<xid>'. Вызов этих SQL-команд через pgx возможен вручную:
// AVOID — PostgreSQL PREPARE TRANSACTION без coordinator и recovery
tx, _ := pool.Begin(ctx)
_, _ = tx.Exec(ctx, "PREPARE TRANSACTION 'order-saga-abc123'")
// ... сетевой вызов к payment-service ...
_, _ = pool.Exec(ctx, "COMMIT PREPARED 'order-saga-abc123'")
Проблема та же, что и в любом 2PC: если процесс упал после PREPARE TRANSACTION, но до COMMIT PREPARED — транзакция зависает в состоянии in_doubt в pg_prepared_xacts. Без отдельного coordinator-сервиса с персистентным журналом разрешить её автоматически невозможно. «Hanging prepared transactions» блокируют vacuum и autovacuum в PG.
Kafka не входит в XA
kafka-go Writer.WriteMessages не участвует ни в каком XA. Если бизнес-операция требует атомарного «запись в PG + публикация в Kafka» — XA здесь невозможен в принципе. Это фундаментальное ограничение, а не недостаток конкретной библиотеки.
Последовательный commit двух pgx.Tx — R-DIST-TX-X3
// AVOID — нет атомарности, нет recovery при сбое между commit'ами
tx1, _ := orderPool.Begin(ctx)
tx2, _ := paymentPool.Begin(ctx)
defer tx1.Rollback(ctx)
defer tx2.Rollback(ctx)
_ = qtxOrder.UpdateOrderStatus(ctx, ...)
_ = qtxPayment.DebitAccount(ctx, ...)
if err := tx1.Commit(ctx); err != nil {
return err
}
// Process killed здесь → tx2 в paymentPool остался закоммиченным
if err := tx2.Commit(ctx); err != nil {
return err
}
Если процесс упал или сеть пропала между двумя Commit — одна БД уже содержит изменения, вторая — нет. defer tx2.Rollback(ctx) уже не поможет: tx1 коммитнул. Inconsistency без recovery-плана. Это нарушение R-DIST-TX-X3.
Альтернативы
Saga с локальными транзакциями — R-DIST-TX-1
Каждый сервис коммитит только свою pgx.Tx. Orchestrator хранит состояние переходов в saga_<name> таблице и при сбое посылает compensation.
Полная схема для Order → Payment → Inventory:
// internal/saga/order_saga.go
func (o *OrderSagaOrchestrator) Start(ctx context.Context, tx pgx.Tx, cmd StartOrderSagaCommand) (uuid.UUID, error) {
sagaID := uuid.New()
qtx := o.queries.WithTx(tx)
if err := qtx.InsertOrderSaga(ctx, db.InsertOrderSagaParams{
SagaID: sagaID,
OrderID: cmd.OrderID,
Status: "payment_pending",
CurrentStep: "reserve_payment",
Payload: cmd.Payload,
}); err != nil {
return uuid.Nil, fmt.Errorf("insert order saga: %w", err)
}
return sagaID, o.outbox.Write(ctx, tx, OutboxMessage{
SagaID: sagaID,
Topic: "payment.commands",
Payload: ReservePaymentCommand{SagaID: sagaID, OrderID: cmd.OrderID, Amount: cmd.Amount},
})
}
func (o *OrderSagaOrchestrator) OnInventoryFailed(ctx context.Context, tx pgx.Tx, event InventoryFailedEvent) error {
qtx := o.queries.WithTx(tx)
if err := qtx.UpdateOrderSagaStatus(ctx, db.UpdateOrderSagaStatusParams{
SagaID: event.SagaID,
Status: "compensating",
CurrentStep: "refund_payment",
}); err != nil {
return fmt.Errorf("update saga to compensating: %w", err)
}
return o.outbox.Write(ctx, tx, OutboxMessage{
SagaID: event.SagaID,
Topic: "payment.commands",
Payload: RefundPaymentCommand{
SagaID: event.SagaID,
OriginalOrderID: event.OrderID,
Reason: "inventory_unavailable",
},
})
}
Каждый pgx.Tx коммитит только в своей БД. Saga-state в таблице позволяет восстановить in-flight saga после рестарта — поллинг saga_order по статусу IN_PROGRESS при старте сервиса.
UseCase Handler запускает saga, не содержит переходов:
// core/order/handler/create_order_handler.go
func (h *CreateOrderHandler) Handle(ctx context.Context, cmd CreateOrderCommand) (*OrderCreatedResult, error) {
tx, err := h.pool.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
qtx := h.queries.WithTx(tx)
orderID := uuid.New()
if err := qtx.InsertOrder(ctx, db.InsertOrderParams{
OrderID: orderID,
CustomerID: cmd.CustomerID,
Status: "pending",
TotalSber: cmd.TotalSber,
}); err != nil {
return nil, fmt.Errorf("insert order: %w", err)
}
sagaID, err := h.saga.Start(ctx, tx, StartOrderSagaCommand{
OrderID: orderID,
Amount: cmd.TotalSber,
Payload: cmd.Payload,
})
if err != nil {
return nil, fmt.Errorf("start order saga: %w", err)
}
if err := tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("commit: %w", err)
}
return &OrderCreatedResult{OrderID: orderID, SagaID: sagaID}, nil
}
Outbox + idempotent consumer — R-DIST-TX-2
Для event-driven синхронизации без rollback'ов: INSERT бизнес-записи и INSERT в outbox_message в одной pgx.Tx. Relay читает outbox и публикует в Kafka. Receiver дедуплицирует по event_id.
// internal/outbox/writer.go
func (w *Writer) Write(ctx context.Context, tx pgx.Tx, msg Message) error {
payload, err := json.Marshal(msg.Payload)
if err != nil {
return fmt.Errorf("marshal outbox payload: %w", err)
}
return w.queries.WithTx(tx).InsertOutboxMessage(ctx, db.InsertOutboxMessageParams{
MessageID: uuid.New(),
Topic: msg.Topic,
Key: msg.Key,
Payload: payload,
Headers: encodeHeaders(msg.Headers),
CreatedAt: time.Now(),
})
}
// internal/outbox/relay.go
func (r *Relay) flush(ctx context.Context) error {
msgs, err := r.queries.FetchPendingOutboxMessages(ctx, 100)
if err != nil {
return fmt.Errorf("fetch outbox: %w", err)
}
for _, m := range msgs {
if err := r.producer.WriteMessages(ctx, kafka.Message{
Topic: m.Topic,
Key: []byte(m.Key),
Value: m.Payload,
Headers: decodeHeaders(m.Headers),
}); err != nil {
return fmt.Errorf("write kafka: %w", err)
}
if err := r.queries.MarkOutboxMessageSent(ctx, m.MessageID); err != nil {
return fmt.Errorf("mark sent: %w", err)
}
}
return nil
}
Relay запускается в отдельной горутине через errgroup:
// cmd/server/main.go
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
return outboxRelay.Run(gctx)
})
g.Go(func() error {
return httpServer.ListenAndServe()
})
if err := g.Wait(); err != nil {
slog.ErrorContext(ctx, "service stopped", "error", err)
}
Outbox — единственный путь публикации в Kafka. Прямой вызов producer.WriteMessages из handler запрещён (R-DIST-OBX-X1).
Modular monolith — R-DIST-TX-3
Когда Order, Payment и Customer тесно связаны и независимая масштабируемость не нужна — один Go-бинар с одним pgxpool.Pool. Одна pgx.Tx покрывает все операции атомарно:
// core/order/handler/create_order_handler.go — modular monolith
func (h *CreateOrderHandler) Handle(ctx context.Context, cmd CreateOrderCommand) (*OrderCreatedResult, error) {
tx, err := h.pool.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
qtx := h.queries.WithTx(tx)
orderID := uuid.New()
if err := qtx.InsertOrder(ctx, db.InsertOrderParams{
OrderID: orderID,
CustomerID: cmd.CustomerID,
Status: "confirmed",
}); err != nil {
return nil, fmt.Errorf("insert order: %w", err)
}
if err := qtx.DebitCustomerBalance(ctx, db.DebitCustomerBalanceParams{
CustomerID: cmd.CustomerID,
Amount: cmd.TotalSber,
}); err != nil {
return nil, fmt.Errorf("debit balance: %w", err)
}
if err := qtx.ReserveProductStock(ctx, db.ReserveProductStockParams{
ProductID: cmd.ProductID,
Quantity: cmd.Quantity,
}); err != nil {
return nil, fmt.Errorf("reserve stock: %w", err)
}
if err := tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("commit: %w", err)
}
return &OrderCreatedResult{OrderID: orderID}, nil
}
Все три операции — одна транзакция в одном PG. Никаких саг, никаких compensation, никакого outbox-relay. Если команда вырастет или потребуется независимый деплой payment — тогда вводим saga. До этого порога monolith проще и надёжнее.
«Мне нужна немедленная консистентность между сервисами»
Это требование почти всегда означает одно из трёх:
- Неверная граница BC — если две операции требуют немедленной консистентности, они скорее всего принадлежат одному Bounded Context. Объединить в один сервис с одним
pgxpool.Pool. - Реальная eventual consistency — на практике нужна быстрая (< 1 с) EC, а не строгая синхронность. Задекларировать в OpenAPI, измерить
read_projection_lag_secondsкак Prometheus-метрику. - Read-your-writes — клиент должен увидеть свой результат сразу. Решается специальным endpoint-ом из write-side или передачей
version-токена клиенту в ответе.
«Хочу 2PC» — почти всегда симптом, не требование.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
PREPARE TRANSACTION / COMMIT PREPARED вручную без coordinator | R-DIST-TX-X1 | saga с локальными pgx.Tx |
Последовательный tx1.Commit + tx2.Commit разных БД без saga-recovery | R-DIST-TX-X3 | один pgx.Tx (monolith) или saga |
Прямой producer.WriteMessages из handler внутри транзакции | R-DIST-OBX-X1 | outbox writer в той же pgx.Tx |
pgx.Tx открытый через сетевой HTTP-вызов к другому сервису | R-DIST-TX-X1 | outbox + saga |
Два pgxpool.Pool (два сервиса) + best-effort последовательность | R-DIST-TX-X3 | saga с compensation |
| Goroutine-after-commit для публикации в Kafka | R-DIST-OBX-X2 | outbox relay |
Куда дальше
- Saga в идиомах Go — orchestrator struct, переходы через
Advance, state в БД через sqlc. - Outbox + Inbox в идиомах Go — writer, relay через errgroup, inbox для critical-сценариев.
- Eventual consistency в идиомах Go — version-поля,
read_projection_lag_seconds, read-your-writes. - Idempotency в идиомах Go — middleware с
(idempotency_key, command_hash),processed_event, TTL. - Compensation в идиомах Go — semantic refund, audit trail, DLQ при сбое compensation.
- Когда нужны распределённые паттерны — modular monolith как альтернатива, критерии выбора.