Опирается на правила: R-DIST-SAGA-1R-DIST-SAGA-5 и R-DIST-SAGA-X1R-DIST-SAGA-X4 из Distributed Patterns — раздел 2. Saga — оркестрация vs хореография.

Важно знать

  • Saga — серия локальных транзакций (pgx.Tx в каждом сервисе) + compensation. В Go нет XA и @Transactional — транзакция явная, пробрасываемая параметром.
  • Orchestration (OrderSagaOrchestrator в internal/saga/) — для сложных саг 4+ шагов или с branching. Весь flow читается в одном файле.
  • Choreography (kafka-события без координатора) — для 2–3 шагов без branching. Подходит, пока сага умещается в двух сервисах.
  • Saga state хранится в saga_<name> таблице PostgreSQL: видимость in-flight саг, recovery после рестарта, audit. State только in-memory — запрещён.
  • saga_id (UUID) пробрасывается в каждое Kafka-сообщение и HTTP-заголовок между сервисами.
  • Orchestrator — отдельный компонент (internal/saga/), не UseCase Handler. Handler делает только локальный шаг и пишет в outbox.
  • Go не имеет стандартного XA-менеджера; последовательный commit двух pgx.Tx из разных БД — best-effort без recovery, не атомарность.

Saga — главный паттерн UCP для управления cross-service бизнес-операцией. Когда «создать заказ» охватывает Payment, Inventory и Order — каждый со своим PostgreSQL и своими транзакциями — saga собирает их в согласованную бизнес-операцию через локальные pgx.Tx и compensation при сбое.

Когда применять Saga

R-DIST-SAGA-1: Saga применяется когда выполнены все три условия:

  1. Операция охватывает 2+ сервиса (Order → Payment → Inventory).
  2. Каждый шаг должен быть локально transactional — commit в свой PG через pgx.Tx.
  3. Нужна возможность compensation при сбое промежуточного шага.

Если третье условие отсутствует (можно «дотолкать» retry-ями без отката предыдущих) — достаточно outbox + idempotent consumer, saga избыточна.

Order         Payment       Inventory
  |  StartSaga  |              |
  |------------>|              |
  |             |  Reserve     |
  |             |<-------------|
  |             |  Reserved    |
  |             |------------->|
  |  Completed  |              |
  |<------------|              |

Orchestration — для complex sagas

R-DIST-SAGA-2: orchestration рекомендуется для саг 4+ шагов или с branching. Центральный координатор — struct OrderSagaOrchestrator в internal/saga/, реагирует на события шагов, продвигает state в БД, отправляет следующую команду через outbox.

// internal/saga/order_saga.go
package saga

type OrderSagaOrchestrator struct {
    queries *db.Queries
    pool    *pgxpool.Pool
    outbox  outbox.Writer
}

func NewOrderSagaOrchestrator(queries *db.Queries, pool *pgxpool.Pool, outbox outbox.Writer) *OrderSagaOrchestrator {
    return &OrderSagaOrchestrator{queries: queries, pool: pool, outbox: outbox}
}

func (o *OrderSagaOrchestrator) Start(ctx context.Context, tx pgx.Tx, cmd StartOrderSagaCommand) (uuid.UUID, error) {
    sagaID := uuid.New()
    qtx := o.queries.WithTx(tx)
    if err := qtx.InsertOrderSaga(ctx, db.InsertOrderSagaParams{
        SagaID:      sagaID,
        OrderID:     cmd.OrderID,
        Status:      "payment_pending",
        CurrentStep: "reserve_payment",
        Payload:     cmd.Payload,
    }); err != nil {
        return uuid.Nil, fmt.Errorf("insert order saga: %w", err)
    }
    return sagaID, o.outbox.Write(ctx, tx, outbox.Message{
        SagaID:  sagaID,
        Topic:   "payment.commands",
        Payload: ReservePaymentCommand{SagaID: sagaID, OrderID: cmd.OrderID, Amount: cmd.Amount},
    })
}

func (o *OrderSagaOrchestrator) OnPaymentReserved(ctx context.Context, tx pgx.Tx, event PaymentReservedEvent) error {
    qtx := o.queries.WithTx(tx)
    if err := qtx.UpdateOrderSagaStatus(ctx, db.UpdateOrderSagaStatusParams{
        SagaID:      event.SagaID,
        Status:      "inventory_pending",
        CurrentStep: "reserve_inventory",
    }); err != nil {
        return fmt.Errorf("update saga status: %w", err)
    }
    return o.outbox.Write(ctx, tx, outbox.Message{
        SagaID:  event.SagaID,
        Topic:   "inventory.commands",
        Payload: ReserveInventoryCommand{SagaID: event.SagaID, OrderID: event.OrderID, Items: event.Items},
    })
}

func (o *OrderSagaOrchestrator) OnInventoryReserved(ctx context.Context, tx pgx.Tx, event InventoryReservedEvent) error {
    qtx := o.queries.WithTx(tx)
    if err := qtx.UpdateOrderSagaStatus(ctx, db.UpdateOrderSagaStatusParams{
        SagaID:      event.SagaID,
        Status:      "completed",
        CurrentStep: "terminal",
    }); err != nil {
        return fmt.Errorf("complete saga: %w", err)
    }
    return qtx.ConfirmOrder(ctx, event.OrderID)
}

Каждый метод orchestrator-а принимает уже начатую pgx.Tx — он не открывает транзакцию сам. Транзакцию открывает Kafka-consumer, передаёт её в orchestrator и коммитит по завершении.

// adapters/in/kafka/saga_consumer.go

func (c *SagaConsumer) handlePaymentReserved(ctx context.Context, msg kafka.Message) error {
    var event saga.PaymentReservedEvent
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return fmt.Errorf("unmarshal PaymentReservedEvent: %w", err)
    }

    tx, err := c.pool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    if err := c.orchestrator.OnPaymentReserved(ctx, tx, event); err != nil {
        return err
    }
    return tx.Commit(ctx)
}

Плюсы orchestration: весь flow читается в internal/saga/order_saga.go, compensation-цепочка очевидна, легко добавить шаг. Минусы: координатор — точка отказа (но recovery через saga_order таблицу).

Choreography — для simple sagas

R-DIST-SAGA-3: choreography — для 2–3 шагов без branching. Каждый сервис подписан на события других и реагирует самостоятельно. Центрального координатора нет.

customer.registered → sber_bonus.events → bonus.activated
                    ↘ notify.commands   → notification.sent
// adapters/in/kafka/customer_consumer.go — в bonus-service

func (c *CustomerConsumer) OnCustomerRegistered(ctx context.Context, msg kafka.Message) error {
    var event CustomerRegisteredEvent
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return fmt.Errorf("unmarshal CustomerRegisteredEvent: %w", err)
    }

    tx, err := c.pool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    qtx := c.queries.WithTx(tx)
    if err := qtx.ActivateSberBonus(ctx, db.ActivateSberBonusParams{
        CustomerID: event.CustomerID,
        SagaID:     event.SagaID,
        Level:      "welcome",
    }); err != nil {
        return fmt.Errorf("activate sber bonus: %w", err)
    }

    if err := c.outbox.Write(ctx, tx, outbox.Message{
        SagaID:  event.SagaID,
        Topic:   "sber_bonus.events",
        Payload: BonusActivatedEvent{SagaID: event.SagaID, CustomerID: event.CustomerID},
    }); err != nil {
        return fmt.Errorf("write outbox: %w", err)
    }
    return tx.Commit(ctx)
}
ПараметрOrchestrationChoreography
Шагов4+2–3
Branchingданет
Видимость flowодин файл (internal/saga/)N сервисов
Где statesaga_order таблица у orchestrator-ау каждого сервиса
Сложность реализациисредняянизкая на старте
Сложность отладкисредняявысокая при росте

При 4+ шагах choreography требует открывать N репозиториев, чтобы восстановить картину flow в голове. Переходи к orchestration.

Saga state в БД

R-DIST-SAGA-4: state саги хранится в saga_<name> таблице PostgreSQL. Это даёт три критичных свойства: видимость (какие саги in-flight), recovery (после рестарта orchestrator-а), audit (история каждой саги).

CREATE TABLE saga_order (
    saga_id       uuid         PRIMARY KEY,
    order_id      uuid         NOT NULL,
    status        text         NOT NULL,
    current_step  text         NOT NULL,
    payload       jsonb        NOT NULL,
    started_at    timestamptz  NOT NULL DEFAULT now(),
    completed_at  timestamptz,
    last_error    text
);

CREATE INDEX ix_saga_order_status
    ON saga_order (status)
    WHERE status IN ('payment_pending', 'inventory_pending', 'compensating');

Partial index только на активных статусах — 99% строк быстро становятся "completed", искать нужно только живые.

Recovery при старте сервиса:

// internal/saga/recovery.go

func (o *OrderSagaOrchestrator) ResumeInFlight(ctx context.Context) error {
    sagas, err := o.queries.FetchInFlightOrderSagas(ctx)
    if err != nil {
        return fmt.Errorf("fetch in-flight sagas: %w", err)
    }

    for _, s := range sagas {
        slog.InfoContext(ctx, "resuming saga",
            "saga_id", s.SagaID,
            "current_step", s.CurrentStep,
            "status", s.Status,
        )
        if err := o.resume(ctx, s); err != nil {
            slog.ErrorContext(ctx, "failed to resume saga",
                "saga_id", s.SagaID,
                "error", err,
            )
        }
    }
    return nil
}

ResumeInFlight вызывается при старте сервиса до того, как Kafka-consumer начинает читать сообщения — иначе часть in-flight саг зависнет в ожидании события, которое уже было доставлено.

SagaId сквозной

R-DIST-SAGA-5: saga_id (UUID) проходит через каждое Kafka-сообщение, каждый HTTP-запрос между сервисами, каждое доменное событие. Это единственный способ корректно трассировать сагу через несколько сервисов.

// internal/saga/messages.go

type ReservePaymentCommand struct {
    SagaID  uuid.UUID `json:"saga_id"`
    OrderID uuid.UUID `json:"order_id"`
    Amount  int64     `json:"amount_kopecks"`
}

type PaymentReservedEvent struct {
    SagaID    uuid.UUID `json:"saga_id"`
    OrderID   uuid.UUID `json:"order_id"`
    PaymentID uuid.UUID `json:"payment_id"`
    Items     []Item    `json:"items"`
}

В Kafka-заголовках saga_id дублируется для slog-контекста:

func sagaIDFromHeaders(headers []kafka.Header) string {
    for _, h := range headers {
        if h.Key == "saga_id" {
            return string(h.Value)
        }
    }
    return ""
}

func (c *SagaConsumer) handle(ctx context.Context, msg kafka.Message) error {
    sagaID := sagaIDFromHeaders(msg.Headers)
    ctx = slog.With(ctx, "saga_id", sagaID)
    // ...
}

В таблицах сервисов — колонка saga_id с индексом: SELECT * FROM product_reservation WHERE saga_id = ? покажет, что произошло в этой саге на стороне Inventory.

Ошибки-значения в orchestrator-е

Go использует ошибки-значения, не исключения. Orchestrator обрабатывает ошибки явно и принимает решение о compensation в том же методе:

func (o *OrderSagaOrchestrator) OnPaymentFailed(ctx context.Context, tx pgx.Tx, event PaymentFailedEvent) error {
    qtx := o.queries.WithTx(tx)

    slog.WarnContext(ctx, "payment failed, marking saga failed",
        "saga_id", event.SagaID,
        "reason", event.Reason,
    )

    return qtx.UpdateOrderSagaStatus(ctx, db.UpdateOrderSagaStatusParams{
        SagaID:      event.SagaID,
        Status:      "failed",
        CurrentStep: "terminal",
        LastError:   pgtype.Text{String: event.Reason, Valid: true},
    })
}

Нет try/catch через несколько сервисов. Каждый failure-путь — отдельный метод orchestrator-а, вызванный по конкретному событию.

Что запрещено

АнтипаттернПравилоЧто взамен
Последовательный tx1.Commit + tx2.Commit разных БДR-DIST-SAGA-X1saga с локальными pgx.Tx в каждом сервисе
Saga без compensation-командR-DIST-SAGA-X2каждый шаг имеет парную compensation
State саги в map[uuid.UUID]SagaState в памятиR-DIST-SAGA-X3saga_order таблица в PG + ResumeInFlight при старте
Saga-переходы внутри UseCase HandlerR-DIST-SAGA-X4отдельный OrderSagaOrchestrator в internal/saga/
Прямой producer.WriteMessages из handler в транзакцииR-DIST-OBX-X1outbox.Write(ctx, tx, ...) в той же pgx.Tx
Choreography на 5+ шаговR-DIST-SAGA-2orchestration с координатором

Куда дальше

  • Compensation — semantic state-change, не tx.Rollback; идемпотентность через статус-проверку.
  • Outbox + Inbox — команды шагов саги публикуются через outbox в той же pgx.Tx.
  • Idempotency — каждый шаг саги обязан быть идемпотентным; processed_event таблица.
  • Eventual consistency — состояние after-saga распространяется асинхронно; read-your-writes.
  • Distributed transactions — что не делать — почему tx1.Commit + tx2.Commit не атомарность.
  • Когда нужны распределённые паттерны — saga актуальна только при cross-service операции.