Опирается на правила:
R-DIST-SAGA-1…R-DIST-SAGA-5иR-DIST-SAGA-X1…R-DIST-SAGA-X4из Distributed Patterns — раздел 2. Saga — оркестрация vs хореография.
Важно знать
- Saga — серия локальных транзакций (
pgx.Txв каждом сервисе) + compensation. В Go нет XA и@Transactional— транзакция явная, пробрасываемая параметром.- Orchestration (
OrderSagaOrchestratorвinternal/saga/) — для сложных саг 4+ шагов или с branching. Весь flow читается в одном файле.- Choreography (kafka-события без координатора) — для 2–3 шагов без branching. Подходит, пока сага умещается в двух сервисах.
- Saga state хранится в
saga_<name>таблице PostgreSQL: видимость in-flight саг, recovery после рестарта, audit. State только in-memory — запрещён.saga_id(UUID) пробрасывается в каждое Kafka-сообщение и HTTP-заголовок между сервисами.- Orchestrator — отдельный компонент (
internal/saga/), не UseCase Handler. Handler делает только локальный шаг и пишет в outbox.- Go не имеет стандартного XA-менеджера; последовательный commit двух
pgx.Txиз разных БД — best-effort без recovery, не атомарность.
Saga — главный паттерн UCP для управления cross-service бизнес-операцией. Когда «создать заказ» охватывает Payment, Inventory и Order — каждый со своим PostgreSQL и своими транзакциями — saga собирает их в согласованную бизнес-операцию через локальные pgx.Tx и compensation при сбое.
Когда применять Saga
R-DIST-SAGA-1: Saga применяется когда выполнены все три условия:
- Операция охватывает 2+ сервиса (Order → Payment → Inventory).
- Каждый шаг должен быть локально transactional — commit в свой PG через
pgx.Tx. - Нужна возможность compensation при сбое промежуточного шага.
Если третье условие отсутствует (можно «дотолкать» retry-ями без отката предыдущих) — достаточно outbox + idempotent consumer, saga избыточна.
Order Payment Inventory
| StartSaga | |
|------------>| |
| | Reserve |
| |<-------------|
| | Reserved |
| |------------->|
| Completed | |
|<------------| |
Orchestration — для complex sagas
R-DIST-SAGA-2: orchestration рекомендуется для саг 4+ шагов или с branching. Центральный координатор — struct OrderSagaOrchestrator в internal/saga/, реагирует на события шагов, продвигает state в БД, отправляет следующую команду через outbox.
// internal/saga/order_saga.go
package saga
type OrderSagaOrchestrator struct {
queries *db.Queries
pool *pgxpool.Pool
outbox outbox.Writer
}
func NewOrderSagaOrchestrator(queries *db.Queries, pool *pgxpool.Pool, outbox outbox.Writer) *OrderSagaOrchestrator {
return &OrderSagaOrchestrator{queries: queries, pool: pool, outbox: outbox}
}
func (o *OrderSagaOrchestrator) Start(ctx context.Context, tx pgx.Tx, cmd StartOrderSagaCommand) (uuid.UUID, error) {
sagaID := uuid.New()
qtx := o.queries.WithTx(tx)
if err := qtx.InsertOrderSaga(ctx, db.InsertOrderSagaParams{
SagaID: sagaID,
OrderID: cmd.OrderID,
Status: "payment_pending",
CurrentStep: "reserve_payment",
Payload: cmd.Payload,
}); err != nil {
return uuid.Nil, fmt.Errorf("insert order saga: %w", err)
}
return sagaID, o.outbox.Write(ctx, tx, outbox.Message{
SagaID: sagaID,
Topic: "payment.commands",
Payload: ReservePaymentCommand{SagaID: sagaID, OrderID: cmd.OrderID, Amount: cmd.Amount},
})
}
func (o *OrderSagaOrchestrator) OnPaymentReserved(ctx context.Context, tx pgx.Tx, event PaymentReservedEvent) error {
qtx := o.queries.WithTx(tx)
if err := qtx.UpdateOrderSagaStatus(ctx, db.UpdateOrderSagaStatusParams{
SagaID: event.SagaID,
Status: "inventory_pending",
CurrentStep: "reserve_inventory",
}); err != nil {
return fmt.Errorf("update saga status: %w", err)
}
return o.outbox.Write(ctx, tx, outbox.Message{
SagaID: event.SagaID,
Topic: "inventory.commands",
Payload: ReserveInventoryCommand{SagaID: event.SagaID, OrderID: event.OrderID, Items: event.Items},
})
}
func (o *OrderSagaOrchestrator) OnInventoryReserved(ctx context.Context, tx pgx.Tx, event InventoryReservedEvent) error {
qtx := o.queries.WithTx(tx)
if err := qtx.UpdateOrderSagaStatus(ctx, db.UpdateOrderSagaStatusParams{
SagaID: event.SagaID,
Status: "completed",
CurrentStep: "terminal",
}); err != nil {
return fmt.Errorf("complete saga: %w", err)
}
return qtx.ConfirmOrder(ctx, event.OrderID)
}
Каждый метод orchestrator-а принимает уже начатую pgx.Tx — он не открывает транзакцию сам. Транзакцию открывает Kafka-consumer, передаёт её в orchestrator и коммитит по завершении.
// adapters/in/kafka/saga_consumer.go
func (c *SagaConsumer) handlePaymentReserved(ctx context.Context, msg kafka.Message) error {
var event saga.PaymentReservedEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
return fmt.Errorf("unmarshal PaymentReservedEvent: %w", err)
}
tx, err := c.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
if err := c.orchestrator.OnPaymentReserved(ctx, tx, event); err != nil {
return err
}
return tx.Commit(ctx)
}
Плюсы orchestration: весь flow читается в internal/saga/order_saga.go, compensation-цепочка очевидна, легко добавить шаг. Минусы: координатор — точка отказа (но recovery через saga_order таблицу).
Choreography — для simple sagas
R-DIST-SAGA-3: choreography — для 2–3 шагов без branching. Каждый сервис подписан на события других и реагирует самостоятельно. Центрального координатора нет.
customer.registered → sber_bonus.events → bonus.activated
↘ notify.commands → notification.sent
// adapters/in/kafka/customer_consumer.go — в bonus-service
func (c *CustomerConsumer) OnCustomerRegistered(ctx context.Context, msg kafka.Message) error {
var event CustomerRegisteredEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
return fmt.Errorf("unmarshal CustomerRegisteredEvent: %w", err)
}
tx, err := c.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
qtx := c.queries.WithTx(tx)
if err := qtx.ActivateSberBonus(ctx, db.ActivateSberBonusParams{
CustomerID: event.CustomerID,
SagaID: event.SagaID,
Level: "welcome",
}); err != nil {
return fmt.Errorf("activate sber bonus: %w", err)
}
if err := c.outbox.Write(ctx, tx, outbox.Message{
SagaID: event.SagaID,
Topic: "sber_bonus.events",
Payload: BonusActivatedEvent{SagaID: event.SagaID, CustomerID: event.CustomerID},
}); err != nil {
return fmt.Errorf("write outbox: %w", err)
}
return tx.Commit(ctx)
}
| Параметр | Orchestration | Choreography |
|---|---|---|
| Шагов | 4+ | 2–3 |
| Branching | да | нет |
| Видимость flow | один файл (internal/saga/) | N сервисов |
| Где state | saga_order таблица у orchestrator-а | у каждого сервиса |
| Сложность реализации | средняя | низкая на старте |
| Сложность отладки | средняя | высокая при росте |
При 4+ шагах choreography требует открывать N репозиториев, чтобы восстановить картину flow в голове. Переходи к orchestration.
Saga state в БД
R-DIST-SAGA-4: state саги хранится в saga_<name> таблице PostgreSQL. Это даёт три критичных свойства: видимость (какие саги in-flight), recovery (после рестарта orchestrator-а), audit (история каждой саги).
CREATE TABLE saga_order (
saga_id uuid PRIMARY KEY,
order_id uuid NOT NULL,
status text NOT NULL,
current_step text NOT NULL,
payload jsonb NOT NULL,
started_at timestamptz NOT NULL DEFAULT now(),
completed_at timestamptz,
last_error text
);
CREATE INDEX ix_saga_order_status
ON saga_order (status)
WHERE status IN ('payment_pending', 'inventory_pending', 'compensating');
Partial index только на активных статусах — 99% строк быстро становятся "completed", искать нужно только живые.
Recovery при старте сервиса:
// internal/saga/recovery.go
func (o *OrderSagaOrchestrator) ResumeInFlight(ctx context.Context) error {
sagas, err := o.queries.FetchInFlightOrderSagas(ctx)
if err != nil {
return fmt.Errorf("fetch in-flight sagas: %w", err)
}
for _, s := range sagas {
slog.InfoContext(ctx, "resuming saga",
"saga_id", s.SagaID,
"current_step", s.CurrentStep,
"status", s.Status,
)
if err := o.resume(ctx, s); err != nil {
slog.ErrorContext(ctx, "failed to resume saga",
"saga_id", s.SagaID,
"error", err,
)
}
}
return nil
}
ResumeInFlight вызывается при старте сервиса до того, как Kafka-consumer начинает читать сообщения — иначе часть in-flight саг зависнет в ожидании события, которое уже было доставлено.
SagaId сквозной
R-DIST-SAGA-5: saga_id (UUID) проходит через каждое Kafka-сообщение, каждый HTTP-запрос между сервисами, каждое доменное событие. Это единственный способ корректно трассировать сагу через несколько сервисов.
// internal/saga/messages.go
type ReservePaymentCommand struct {
SagaID uuid.UUID `json:"saga_id"`
OrderID uuid.UUID `json:"order_id"`
Amount int64 `json:"amount_kopecks"`
}
type PaymentReservedEvent struct {
SagaID uuid.UUID `json:"saga_id"`
OrderID uuid.UUID `json:"order_id"`
PaymentID uuid.UUID `json:"payment_id"`
Items []Item `json:"items"`
}
В Kafka-заголовках saga_id дублируется для slog-контекста:
func sagaIDFromHeaders(headers []kafka.Header) string {
for _, h := range headers {
if h.Key == "saga_id" {
return string(h.Value)
}
}
return ""
}
func (c *SagaConsumer) handle(ctx context.Context, msg kafka.Message) error {
sagaID := sagaIDFromHeaders(msg.Headers)
ctx = slog.With(ctx, "saga_id", sagaID)
// ...
}
В таблицах сервисов — колонка saga_id с индексом: SELECT * FROM product_reservation WHERE saga_id = ? покажет, что произошло в этой саге на стороне Inventory.
Ошибки-значения в orchestrator-е
Go использует ошибки-значения, не исключения. Orchestrator обрабатывает ошибки явно и принимает решение о compensation в том же методе:
func (o *OrderSagaOrchestrator) OnPaymentFailed(ctx context.Context, tx pgx.Tx, event PaymentFailedEvent) error {
qtx := o.queries.WithTx(tx)
slog.WarnContext(ctx, "payment failed, marking saga failed",
"saga_id", event.SagaID,
"reason", event.Reason,
)
return qtx.UpdateOrderSagaStatus(ctx, db.UpdateOrderSagaStatusParams{
SagaID: event.SagaID,
Status: "failed",
CurrentStep: "terminal",
LastError: pgtype.Text{String: event.Reason, Valid: true},
})
}
Нет try/catch через несколько сервисов. Каждый failure-путь — отдельный метод orchestrator-а, вызванный по конкретному событию.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Последовательный tx1.Commit + tx2.Commit разных БД | R-DIST-SAGA-X1 | saga с локальными pgx.Tx в каждом сервисе |
| Saga без compensation-команд | R-DIST-SAGA-X2 | каждый шаг имеет парную compensation |
State саги в map[uuid.UUID]SagaState в памяти | R-DIST-SAGA-X3 | saga_order таблица в PG + ResumeInFlight при старте |
| Saga-переходы внутри UseCase Handler | R-DIST-SAGA-X4 | отдельный OrderSagaOrchestrator в internal/saga/ |
Прямой producer.WriteMessages из handler в транзакции | R-DIST-OBX-X1 | outbox.Write(ctx, tx, ...) в той же pgx.Tx |
| Choreography на 5+ шагов | R-DIST-SAGA-2 | orchestration с координатором |
Куда дальше
- Compensation — semantic state-change, не
tx.Rollback; идемпотентность через статус-проверку. - Outbox + Inbox — команды шагов саги публикуются через outbox в той же
pgx.Tx. - Idempotency — каждый шаг саги обязан быть идемпотентным;
processed_eventтаблица. - Eventual consistency — состояние after-saga распространяется асинхронно; read-your-writes.
- Distributed transactions — что не делать — почему
tx1.Commit + tx2.Commitне атомарность. - Когда нужны распределённые паттерны — saga актуальна только при cross-service операции.