Опирается на правила:
R-DIST-EC-1…R-DIST-EC-4иR-DIST-EC-X1…R-DIST-EC-X2из Distributed Patterns Style Guide → раздел 4. Eventual consistency.
Важно знать
- Eventual consistency — норма в распределённой системе, но требует явного контракта: клиент должен знать, что данные могут отставать.
- Декларация в OpenAPI — для каждого endpoint, читающего eventual-consistent данные, в
descriptionуказать ожидаемую задержку (R-DIST-EC-1).- Read-your-writes в Go — четыре стратегии: polling в клиенте, synchronous wait в handler, write-side endpoint, sticky session. Ни одна не работает «сама по себе» (
R-DIST-EC-2).- Bounded staleness SLO — метрика
read_projection_lag_secondsв Prometheus + алерт на p99 > 5 секунд (R-DIST-EC-3).- Causal consistency через
version-поле: Kafka consumer применяет событие только еслиevent.Version > current_version, иначе идемпотентный skip (R-DIST-EC-4).- Молчаливая EC — главный антипаттерн: клиент делает write, сразу читает, получает stale data, теряет доверие к API (
R-DIST-EC-X1).- 2PC для немедленной согласованности — не масштабируется; в Go нет стандартного XA-менеджера; либо EC, либо redesign boundary (
R-DIST-EC-X2).- Проверка version-поля — атомарно в той же
pgx.Tx, что и upsert проекции: нет race condition между проверкой и применением.
В распределённой системе теорема CAP вынуждает выбирать. UCP-стек выбирает доступность и устойчивость к разделению — и принимает eventual consistency как норму. Задача не избежать EC, а сделать его явным: задекларировать, измерить, защитить порядок событий.
Go-особенность: нет транзакционного менеджера с аннотациями, нет @KafkaListener с @Transactional. Вместо них — явный pgx.Tx, пробрасываемый через параметры, и Kafka consumer как обычная функция. Это делает EC-механику более читаемой: каждый constraint виден в коде, а не спрятан за аспектом.
Декларация в OpenAPI
R-DIST-EC-1: для endpoint, читающего из read-проекции, в OpenAPI description указываем ожидаемую задержку.
# openapi.yaml
/customers/{customerId}/orders:
get:
summary: Список заказов клиента
description: |
Возвращает summary заказов из денормализованной read-проекции.
**Eventual consistency**: задержка от write в order-service до появления
здесь обычно < 1 секунды, p99 < 5 секунд. Если нужна немедленная
согласованность после POST /orders — использовать GET /orders/{id},
этот endpoint читает write-side напрямую.
parameters:
- name: customerId
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Список заказов (могут отставать на 1–5 секунд)
В chi-маршрутизаторе комментарий для swaggo/swag — в godoc над handler-функцией:
// adapters/in/http/order_handler.go
// GetCustomerOrders godoc
// @Summary Список заказов клиента
// @Description Возвращает summary заказов из read-проекции.\n\n**Eventual consistency**: задержка < 1s, p99 < 5s. Для immediate consistency после POST /orders — GET /orders/{id}.
// @Tags orders
// @Produce json
// @Param customerId path string true "UUID клиента"
// @Success 200 {array} OrderSummaryResponse
// @Router /customers/{customerId}/orders [get]
func (h *OrderHandler) GetCustomerOrders(w http.ResponseWriter, r *http.Request) {
customerID, err := uuid.Parse(chi.URLParam(r, "customerId"))
if err != nil {
httperr.Write(w, r, &InvalidParamError{Param: "customerId", Err: err})
return
}
summaries, err := h.queries.GetOrderSummariesByCustomer(r.Context(), customerID)
if err != nil {
httperr.Write(w, r, fmt.Errorf("get order summaries: %w", err))
return
}
render.JSON(w, r, summaries)
}
Без декларации разработчик клиента напишет тест POST /orders + сразу GET /customers/{id}/orders и откроет баг-репорт, когда заказ не появился в проекции.
Read-your-writes
R-DIST-EC-2: «клиент после своего write сразу читает свой результат» — отдельная задача. Четыре стратегии:
Polling в клиенте
Клиент делает POST /orders, получает orderId, затем polling GET /orders/{id} из проекции:
// Клиентский код — Go SDK для order-service
func (c *OrderClient) CreateAndWait(ctx context.Context, req CreateOrderRequest) (*OrderSummary, error) {
created, err := c.Create(ctx, req)
if err != nil {
return nil, err
}
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
deadline := time.Now().Add(4 * time.Second)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case t := <-ticker.C:
if t.After(deadline) {
return nil, &ReadModelTimeoutError{OrderID: created.OrderID}
}
summary, err := c.GetSummary(ctx, created.OrderID)
if err == nil {
return summary, nil
}
if !errors.Is(err, ErrNotFound) {
return nil, err
}
}
}
}
Применимо для UI с прогресс-индикатором: клиент платит latency, сервер не держит соединение.
Synchronous wait в handler
Handler после write выполняет короткий polling read-model на стороне сервера:
// adapters/in/http/order_handler.go
func (h *OrderHandler) CreateOrder(w http.ResponseWriter, r *http.Request) {
var req CreateOrderRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
httperr.Write(w, r, &DecodeError{Err: err})
return
}
order, err := h.useCase.Execute(r.Context(), req.toCommand())
if err != nil {
httperr.Write(w, r, err)
return
}
summary, err := h.waitForProjection(r.Context(), order.ID, 2*time.Second)
if err != nil {
slog.WarnContext(r.Context(), "projection not ready, falling back to write-side",
"order_id", order.ID,
"error", err,
)
summary = orderSummaryFromWriteSide(order)
}
w.WriteHeader(http.StatusCreated)
render.JSON(w, r, summary)
}
func (h *OrderHandler) waitForProjection(ctx context.Context, orderID uuid.UUID, timeout time.Duration) (*OrderSummaryResponse, error) {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
row, err := h.queries.FindOrderSummary(ctx, orderID)
if err == nil {
return toOrderSummaryResponse(row), nil
}
if !errors.Is(err, pgx.ErrNoRows) {
return nil, fmt.Errorf("find order summary: %w", err)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(100 * time.Millisecond):
}
}
return nil, &ProjectionTimeoutError{OrderID: orderID}
}
Подходит для критичных flow: создание заказа → сразу показать summary. Fallback на write-side защищает от timeout projection.
Write-side endpoint
Два endpoint с разной семантикой — рабочая комбинация:
POST /orders → CreateOrderHandler (write)
GET /orders/{id} → write-side, immediate consistency
GET /customers/{id}/orders → read-projection, eventual consistency
// adapters/in/http/router.go
func NewRouter(orderH *OrderHandler, customerH *CustomerHandler) chi.Router {
r := chi.NewRouter()
r.Post("/orders", orderH.CreateOrder)
r.Get("/orders/{id}", orderH.GetOrderByID) // write-side
r.Get("/customers/{id}/orders", customerH.GetOrders) // read-projection (EC)
return r
}
// GetOrderByID — читает из write-side (полный агрегат из orders-таблицы)
func (h *OrderHandler) GetOrderByID(w http.ResponseWriter, r *http.Request) {
orderID, err := uuid.Parse(chi.URLParam(r, "id"))
if err != nil {
httperr.Write(w, r, &InvalidParamError{Param: "id", Err: err})
return
}
row, err := h.queries.GetOrderByID(r.Context(), orderID)
if errors.Is(err, pgx.ErrNoRows) {
httperr.Write(w, r, &NotFoundError{Entity: "order", ID: orderID.String()})
return
}
if err != nil {
httperr.Write(w, r, fmt.Errorf("get order: %w", err))
return
}
render.JSON(w, r, toOrderDetailResponse(row))
}
Read-проекция держит нагрузку; write-side endpoint обслуживает «сразу после write».
Bounded staleness SLO
R-DIST-EC-3: у каждой read-model явный SLO на максимальную задержку. Измеряем через Prometheus.
// adapters/in/kafka/metrics.go
package kafka
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var projectionLag = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "read_projection_lag_seconds",
Help: "Задержка между timestamp события и применением к read-проекции",
Buckets: prometheus.DefBuckets,
}, []string{"model", "event_type"})
// adapters/in/kafka/order_consumer.go
func (c *OrderConsumer) handleOrderEvent(ctx context.Context, tx pgx.Tx, msg kafka.Message) error {
var event OrderEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
return fmt.Errorf("unmarshal order event: %w", err)
}
projectionLag.WithLabelValues("order_summary", event.EventType).
Observe(time.Since(event.OccurredAt).Seconds())
return c.projection.Apply(ctx, tx, event)
}
Алерт в Prometheus:
# alerts/distributed.yml
groups:
- name: distributed-patterns
rules:
- alert: ReadProjectionStalenessHigh
expr: >
histogram_quantile(0.99,
sum by (le, model) (rate(read_projection_lag_seconds_bucket[5m]))
) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "Read-проекция {{ $labels.model }} отстаёт > 5 секунд (p99)"
Без SLO read-model тихо отстаёт на час, и команда узнаёт об этом от клиентов.
Causal consistency через version
R-DIST-EC-4: когда порядок событий важен, receiver проверяет монотонность version-поля и пропускает out-of-order события.
// core/order/projection.go
package order
type OrderProjection struct {
queries *db.Queries
}
func NewOrderProjection(queries *db.Queries) *OrderProjection {
return &OrderProjection{queries: queries}
}
func (p *OrderProjection) Apply(ctx context.Context, tx pgx.Tx, event OrderEvent) error {
qtx := p.queries.WithTx(tx)
current, err := qtx.GetOrderProjectionVersion(ctx, event.OrderID)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("get projection version: %w", err)
}
if err == nil && event.Version <= current {
slog.DebugContext(ctx, "skipping out-of-order event",
"order_id", event.OrderID,
"event_version", event.Version,
"current_version", current,
)
return nil
}
return qtx.UpsertOrderProjection(ctx, db.UpsertOrderProjectionParams{
OrderID: event.OrderID,
Version: event.Version,
Status: string(event.NewStatus),
CustomerID: event.CustomerID,
TotalRub: event.TotalRub,
UpdatedAt: event.OccurredAt,
})
}
-- db/queries/order_projection.sql
-- name: GetOrderProjectionVersion :one
SELECT version FROM order_projection WHERE order_id = $1;
-- name: UpsertOrderProjection :exec
INSERT INTO order_projection (order_id, version, status, customer_id, total_rub, updated_at)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (order_id) DO UPDATE
SET version = EXCLUDED.version,
status = EXCLUDED.status,
total_rub = EXCLUDED.total_rub,
updated_at = EXCLUDED.updated_at
WHERE order_projection.version < EXCLUDED.version;
WHERE order_projection.version < EXCLUDED.version в ON CONFLICT DO UPDATE — дополнительная защита на уровне БД: даже если два consumer-а обработают событие одновременно, применится только бо́льшая версия.
Scalar version на агрегат покрывает большинство задач; vector clock нужен редко — только когда несколько write-source независимо изменяют разные части агрегата.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Endpoint возвращает stale data без декларации в OpenAPI | R-DIST-EC-X1 | description с ожидаемой задержкой и ссылкой на write-side endpoint |
2PC через два pgx.Tx разных сервисов для немедленной согласованности | R-DIST-EC-X2 | redesign boundary → один сервис + одна pgx.Tx, либо принять EC |
| Read-model без SLO и без метрики задержки | R-DIST-EC-3 | read_projection_lag_seconds + Prometheus-алерт |
| Out-of-order event применяется без проверки version | R-DIST-EC-4 | event.Version > current_version перед upsert; WHERE version < EXCLUDED.version в SQL |
| Polling без timeout и fallback | R-DIST-EC-2 | deadline + fallback на write-side response |
| Read-your-writes через polling на каждый GET под нагрузкой | R-DIST-EC-2 | отдельный write-side endpoint только для «сразу после write»; проекция держит нагрузку |
Куда дальше
- Outbox и Inbox — главный механизм синхронизации read-проекции: атомарная запись события с бизнес-транзакцией.
- Idempotency — consumer read-проекции должен быть идемпотентным при retry и rebalance.
- Saga —
saga_idсквозной через все шаги; in-flight саги — отдельный случай eventual consistency. - Compensation — semantic compensation через статус
cancelled, а не DELETE; audit trail обязателен. - Distributed transactions — почему 2PC/XA не вариант в Go-стеке.
- When to use distributed patterns — когда EC и распределённые паттерны нужны, когда достаточно одной
pgx.Tx.