Опирается на правила: R-DIST-EC-1R-DIST-EC-4 и R-DIST-EC-X1R-DIST-EC-X2 из Distributed Patterns Style Guide → раздел 4. Eventual consistency.

Важно знать

  • Eventual consistency — норма в распределённой системе, но требует явного контракта: клиент должен знать, что данные могут отставать.
  • Декларация в OpenAPI — для каждого endpoint, читающего eventual-consistent данные, в description указать ожидаемую задержку (R-DIST-EC-1).
  • Read-your-writes в Go — четыре стратегии: polling в клиенте, synchronous wait в handler, write-side endpoint, sticky session. Ни одна не работает «сама по себе» (R-DIST-EC-2).
  • Bounded staleness SLO — метрика read_projection_lag_seconds в Prometheus + алерт на p99 > 5 секунд (R-DIST-EC-3).
  • Causal consistency через version-поле: Kafka consumer применяет событие только если event.Version > current_version, иначе идемпотентный skip (R-DIST-EC-4).
  • Молчаливая EC — главный антипаттерн: клиент делает write, сразу читает, получает stale data, теряет доверие к API (R-DIST-EC-X1).
  • 2PC для немедленной согласованности — не масштабируется; в Go нет стандартного XA-менеджера; либо EC, либо redesign boundary (R-DIST-EC-X2).
  • Проверка version-поля — атомарно в той же pgx.Tx, что и upsert проекции: нет race condition между проверкой и применением.

В распределённой системе теорема CAP вынуждает выбирать. UCP-стек выбирает доступность и устойчивость к разделению — и принимает eventual consistency как норму. Задача не избежать EC, а сделать его явным: задекларировать, измерить, защитить порядок событий.

Go-особенность: нет транзакционного менеджера с аннотациями, нет @KafkaListener с @Transactional. Вместо них — явный pgx.Tx, пробрасываемый через параметры, и Kafka consumer как обычная функция. Это делает EC-механику более читаемой: каждый constraint виден в коде, а не спрятан за аспектом.

Декларация в OpenAPI

R-DIST-EC-1: для endpoint, читающего из read-проекции, в OpenAPI description указываем ожидаемую задержку.

# openapi.yaml
/customers/{customerId}/orders:
  get:
    summary: Список заказов клиента
    description: |
      Возвращает summary заказов из денормализованной read-проекции.

      **Eventual consistency**: задержка от write в order-service до появления
      здесь обычно < 1 секунды, p99 < 5 секунд. Если нужна немедленная
      согласованность после POST /orders — использовать GET /orders/{id},
      этот endpoint читает write-side напрямую.
    parameters:
      - name: customerId
        in: path
        required: true
        schema:
          type: string
          format: uuid
    responses:
      "200":
        description: Список заказов (могут отставать на 1–5 секунд)

В chi-маршрутизаторе комментарий для swaggo/swag — в godoc над handler-функцией:

// adapters/in/http/order_handler.go

// GetCustomerOrders godoc
// @Summary Список заказов клиента
// @Description Возвращает summary заказов из read-проекции.\n\n**Eventual consistency**: задержка < 1s, p99 < 5s. Для immediate consistency после POST /orders — GET /orders/{id}.
// @Tags orders
// @Produce json
// @Param customerId path string true "UUID клиента"
// @Success 200 {array} OrderSummaryResponse
// @Router /customers/{customerId}/orders [get]
func (h *OrderHandler) GetCustomerOrders(w http.ResponseWriter, r *http.Request) {
    customerID, err := uuid.Parse(chi.URLParam(r, "customerId"))
    if err != nil {
        httperr.Write(w, r, &InvalidParamError{Param: "customerId", Err: err})
        return
    }

    summaries, err := h.queries.GetOrderSummariesByCustomer(r.Context(), customerID)
    if err != nil {
        httperr.Write(w, r, fmt.Errorf("get order summaries: %w", err))
        return
    }

    render.JSON(w, r, summaries)
}

Без декларации разработчик клиента напишет тест POST /orders + сразу GET /customers/{id}/orders и откроет баг-репорт, когда заказ не появился в проекции.

Read-your-writes

R-DIST-EC-2: «клиент после своего write сразу читает свой результат» — отдельная задача. Четыре стратегии:

Polling в клиенте

Клиент делает POST /orders, получает orderId, затем polling GET /orders/{id} из проекции:

// Клиентский код — Go SDK для order-service
func (c *OrderClient) CreateAndWait(ctx context.Context, req CreateOrderRequest) (*OrderSummary, error) {
    created, err := c.Create(ctx, req)
    if err != nil {
        return nil, err
    }

    ticker := time.NewTicker(200 * time.Millisecond)
    defer ticker.Stop()
    deadline := time.Now().Add(4 * time.Second)

    for {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case t := <-ticker.C:
            if t.After(deadline) {
                return nil, &ReadModelTimeoutError{OrderID: created.OrderID}
            }
            summary, err := c.GetSummary(ctx, created.OrderID)
            if err == nil {
                return summary, nil
            }
            if !errors.Is(err, ErrNotFound) {
                return nil, err
            }
        }
    }
}

Применимо для UI с прогресс-индикатором: клиент платит latency, сервер не держит соединение.

Synchronous wait в handler

Handler после write выполняет короткий polling read-model на стороне сервера:

// adapters/in/http/order_handler.go

func (h *OrderHandler) CreateOrder(w http.ResponseWriter, r *http.Request) {
    var req CreateOrderRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        httperr.Write(w, r, &DecodeError{Err: err})
        return
    }

    order, err := h.useCase.Execute(r.Context(), req.toCommand())
    if err != nil {
        httperr.Write(w, r, err)
        return
    }

    summary, err := h.waitForProjection(r.Context(), order.ID, 2*time.Second)
    if err != nil {
        slog.WarnContext(r.Context(), "projection not ready, falling back to write-side",
            "order_id", order.ID,
            "error", err,
        )
        summary = orderSummaryFromWriteSide(order)
    }

    w.WriteHeader(http.StatusCreated)
    render.JSON(w, r, summary)
}

func (h *OrderHandler) waitForProjection(ctx context.Context, orderID uuid.UUID, timeout time.Duration) (*OrderSummaryResponse, error) {
    deadline := time.Now().Add(timeout)
    for time.Now().Before(deadline) {
        row, err := h.queries.FindOrderSummary(ctx, orderID)
        if err == nil {
            return toOrderSummaryResponse(row), nil
        }
        if !errors.Is(err, pgx.ErrNoRows) {
            return nil, fmt.Errorf("find order summary: %w", err)
        }
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-time.After(100 * time.Millisecond):
        }
    }
    return nil, &ProjectionTimeoutError{OrderID: orderID}
}

Подходит для критичных flow: создание заказа → сразу показать summary. Fallback на write-side защищает от timeout projection.

Write-side endpoint

Два endpoint с разной семантикой — рабочая комбинация:

POST /orders                        → CreateOrderHandler (write)
GET /orders/{id}                    → write-side, immediate consistency
GET /customers/{id}/orders          → read-projection, eventual consistency
// adapters/in/http/router.go
func NewRouter(orderH *OrderHandler, customerH *CustomerHandler) chi.Router {
    r := chi.NewRouter()

    r.Post("/orders", orderH.CreateOrder)
    r.Get("/orders/{id}", orderH.GetOrderByID)           // write-side

    r.Get("/customers/{id}/orders", customerH.GetOrders) // read-projection (EC)

    return r
}
// GetOrderByID — читает из write-side (полный агрегат из orders-таблицы)
func (h *OrderHandler) GetOrderByID(w http.ResponseWriter, r *http.Request) {
    orderID, err := uuid.Parse(chi.URLParam(r, "id"))
    if err != nil {
        httperr.Write(w, r, &InvalidParamError{Param: "id", Err: err})
        return
    }

    row, err := h.queries.GetOrderByID(r.Context(), orderID)
    if errors.Is(err, pgx.ErrNoRows) {
        httperr.Write(w, r, &NotFoundError{Entity: "order", ID: orderID.String()})
        return
    }
    if err != nil {
        httperr.Write(w, r, fmt.Errorf("get order: %w", err))
        return
    }

    render.JSON(w, r, toOrderDetailResponse(row))
}

Read-проекция держит нагрузку; write-side endpoint обслуживает «сразу после write».

Bounded staleness SLO

R-DIST-EC-3: у каждой read-model явный SLO на максимальную задержку. Измеряем через Prometheus.

// adapters/in/kafka/metrics.go
package kafka

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var projectionLag = promauto.NewHistogramVec(prometheus.HistogramOpts{
    Name:    "read_projection_lag_seconds",
    Help:    "Задержка между timestamp события и применением к read-проекции",
    Buckets: prometheus.DefBuckets,
}, []string{"model", "event_type"})
// adapters/in/kafka/order_consumer.go

func (c *OrderConsumer) handleOrderEvent(ctx context.Context, tx pgx.Tx, msg kafka.Message) error {
    var event OrderEvent
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return fmt.Errorf("unmarshal order event: %w", err)
    }

    projectionLag.WithLabelValues("order_summary", event.EventType).
        Observe(time.Since(event.OccurredAt).Seconds())

    return c.projection.Apply(ctx, tx, event)
}

Алерт в Prometheus:

# alerts/distributed.yml
groups:
  - name: distributed-patterns
    rules:
      - alert: ReadProjectionStalenessHigh
        expr: >
          histogram_quantile(0.99,
            sum by (le, model) (rate(read_projection_lag_seconds_bucket[5m]))
          ) > 5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Read-проекция {{ $labels.model }} отстаёт > 5 секунд (p99)"

Без SLO read-model тихо отстаёт на час, и команда узнаёт об этом от клиентов.

Causal consistency через version

R-DIST-EC-4: когда порядок событий важен, receiver проверяет монотонность version-поля и пропускает out-of-order события.

// core/order/projection.go
package order

type OrderProjection struct {
    queries *db.Queries
}

func NewOrderProjection(queries *db.Queries) *OrderProjection {
    return &OrderProjection{queries: queries}
}

func (p *OrderProjection) Apply(ctx context.Context, tx pgx.Tx, event OrderEvent) error {
    qtx := p.queries.WithTx(tx)

    current, err := qtx.GetOrderProjectionVersion(ctx, event.OrderID)
    if err != nil && !errors.Is(err, pgx.ErrNoRows) {
        return fmt.Errorf("get projection version: %w", err)
    }

    if err == nil && event.Version <= current {
        slog.DebugContext(ctx, "skipping out-of-order event",
            "order_id", event.OrderID,
            "event_version", event.Version,
            "current_version", current,
        )
        return nil
    }

    return qtx.UpsertOrderProjection(ctx, db.UpsertOrderProjectionParams{
        OrderID:    event.OrderID,
        Version:    event.Version,
        Status:     string(event.NewStatus),
        CustomerID: event.CustomerID,
        TotalRub:   event.TotalRub,
        UpdatedAt:  event.OccurredAt,
    })
}
-- db/queries/order_projection.sql

-- name: GetOrderProjectionVersion :one
SELECT version FROM order_projection WHERE order_id = $1;

-- name: UpsertOrderProjection :exec
INSERT INTO order_projection (order_id, version, status, customer_id, total_rub, updated_at)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (order_id) DO UPDATE
    SET version    = EXCLUDED.version,
        status     = EXCLUDED.status,
        total_rub  = EXCLUDED.total_rub,
        updated_at = EXCLUDED.updated_at
WHERE order_projection.version < EXCLUDED.version;

WHERE order_projection.version < EXCLUDED.version в ON CONFLICT DO UPDATE — дополнительная защита на уровне БД: даже если два consumer-а обработают событие одновременно, применится только бо́льшая версия.

Scalar version на агрегат покрывает большинство задач; vector clock нужен редко — только когда несколько write-source независимо изменяют разные части агрегата.

Что запрещено

АнтипаттернПравилоЧто взамен
Endpoint возвращает stale data без декларации в OpenAPIR-DIST-EC-X1description с ожидаемой задержкой и ссылкой на write-side endpoint
2PC через два pgx.Tx разных сервисов для немедленной согласованностиR-DIST-EC-X2redesign boundary → один сервис + одна pgx.Tx, либо принять EC
Read-model без SLO и без метрики задержкиR-DIST-EC-3read_projection_lag_seconds + Prometheus-алерт
Out-of-order event применяется без проверки versionR-DIST-EC-4event.Version > current_version перед upsert; WHERE version < EXCLUDED.version в SQL
Polling без timeout и fallbackR-DIST-EC-2deadline + fallback на write-side response
Read-your-writes через polling на каждый GET под нагрузкойR-DIST-EC-2отдельный write-side endpoint только для «сразу после write»; проекция держит нагрузку

Куда дальше

  • Outbox и Inbox — главный механизм синхронизации read-проекции: атомарная запись события с бизнес-транзакцией.
  • Idempotency — consumer read-проекции должен быть идемпотентным при retry и rebalance.
  • Saga — saga_id сквозной через все шаги; in-flight саги — отдельный случай eventual consistency.
  • Compensation — semantic compensation через статус cancelled, а не DELETE; audit trail обязателен.
  • Distributed transactions — почему 2PC/XA не вариант в Go-стеке.
  • When to use distributed patterns — когда EC и распределённые паттерны нужны, когда достаточно одной pgx.Tx.