Опирается на правила: R-RES-ASYNC-1R-RES-ASYNC-3 и R-RES-ASYNC-X1R-RES-ASYNC-X2 из Resilience Style Guide → раздел 11. Async и polling.

Важно знать

  • Polling внешней системы (страхование, фискализация) — через task-queue в БД, не через time.Sleep-цикл в горутине HTTP-хендлера.
  • Хендлер создаёт запись в таблице *_task (status=IN_PROGRESS, next_attempt_at=now()+5s) и возвращает 202 Accepted.
  • Scheduler (time.NewTicker, каждые 5s, FOR UPDATE SKIP LOCKED) опрашивает внешнюю систему и обновляет статус задачи.
  • При успехе — status=COMPLETED, бизнес-флоу продолжается через event или прямое обновление агрегата.
  • time.Sleep в адаптере допустим только если суммарное ожидание <2s (короткий transient backoff).
  • Для async outbound через goroutine — context.WithTimeout обязателен (R-RES-ASYNC-3); без него timeout не покрывается ни gobreaker, ни retry.Do.
  • time.Sleep-цикл в HTTP-хендлере — главный антипаттерн: держит горутину на N × iterations секунд, при нагрузке исчерпывает worker-пул.
  • time.Sleep(d) при d > 5s — запах «должно было быть task-queue».

Когда страховой партнёр принимает запрос и отвечает «ждите, мы проверяем» — первый импульс: подождать в цикле и переспросить. В HTTP-хендлере это блокирует горутину на время ожидания. Task-queue делает то же самое, но вне хендлера, с durable-гарантией и горизонтальным масштабированием.

Polling через task-queue

R-RES-ASYNC-1: классическая структура для сценария «отправили → ждём результат».

1. Схема task-таблицы

CREATE TABLE insurance_coverage_task (
    task_id          BIGSERIAL PRIMARY KEY,
    order_id         BIGINT      NOT NULL,
    external_ref     TEXT,
    status           TEXT        NOT NULL,   -- IN_PROGRESS / COMPLETED / FAILED
    retry_count      INT         NOT NULL DEFAULT 0,
    next_attempt_at  TIMESTAMPTZ NOT NULL,
    last_error       TEXT,
    created_at       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at       TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX ix_ict_due ON insurance_coverage_task (status, next_attempt_at)
    WHERE status = 'IN_PROGRESS';

Partial index по status = 'IN_PROGRESS' исключает завершённые строки из рабочего скана.

2. Хендлер ставит задачу и возвращает 202

// core/usecase/create_order_handler.go
type CreateOrderHandler struct {
    orders    OrderRepository
    tasks     InsuranceCoverageTaskRepository
}

func (h *CreateOrderHandler) Handle(ctx context.Context, cmd CreateOrderCommand) (CreateOrderResult, error) {
    order, err := NewOrder(cmd.CustomerID, cmd.Items)
    if err != nil {
        return CreateOrderResult{}, fmt.Errorf("create order: %w", err)
    }

    if err := h.orders.Save(ctx, order); err != nil {
        return CreateOrderResult{}, fmt.Errorf("save order: %w", err)
    }

    taskID, err := h.tasks.Enqueue(ctx, InsuranceCoverageTask{
        OrderID:        order.ID,
        Status:         TaskStatusInProgress,
        NextAttemptAt:  time.Now(),
    })
    if err != nil {
        return CreateOrderResult{}, fmt.Errorf("enqueue coverage task: %w", err)
    }

    return CreateOrderResult{OrderID: order.ID, TaskID: taskID, Queued: true}, nil
}

Контроллер (chi) маппит result.Queued == true в 202 Accepted с Location: /orders/{id}:

// adapters/in/http/order_handler.go
func (h *OrderHandler) CreateOrder(w http.ResponseWriter, r *http.Request) {
    var cmd CreateOrderCommand
    if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil {
        http.Error(w, "bad request", http.StatusBadRequest)
        return
    }

    result, err := h.handler.Handle(r.Context(), cmd)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    if result.Queued {
        w.Header().Set("Location", fmt.Sprintf("/orders/%d", result.OrderID))
        w.WriteHeader(http.StatusAccepted)
        json.NewEncoder(w).Encode(map[string]any{
            "status":   "queued",
            "order_id": result.OrderID,
            "task_id":  result.TaskID,
        })
        return
    }
    json.NewEncoder(w).Encode(result)
}

3. Scheduler опрашивает due-tasks

// scheduler/insurance_poll_scheduler.go
type InsurancePollScheduler struct {
    tasks     InsuranceCoverageTaskRepository
    insurance InsurancePort
    orders    OrderRepository
}

func (s *InsurancePollScheduler) Run(ctx context.Context) {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            s.processDue(ctx)
        }
    }
}

func (s *InsurancePollScheduler) processDue(ctx context.Context) {
    tasks, err := s.tasks.FindDue(ctx, 50)   // FOR UPDATE SKIP LOCKED LIMIT 50
    if err != nil {
        slog.Error("insurance poll: find due tasks", "error", err)
        return
    }
    for _, t := range tasks {
        s.process(ctx, t)
    }
}

func (s *InsurancePollScheduler) process(ctx context.Context, t InsuranceCoverageTask) {
    callCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
    defer cancel()

    coverage, err := s.insurance.GetCoverageStatus(callCtx, t.ExternalRef)
    if err != nil {
        s.scheduleRetry(ctx, t, err.Error())
        return
    }

    if !coverage.Confirmed {
        s.scheduleRetry(ctx, t, "still pending")
        return
    }

    if err := s.tasks.MarkCompleted(ctx, t.TaskID); err != nil {
        slog.Error("mark task completed", "task_id", t.TaskID, "error", err)
        return
    }
    if err := s.orders.MarkCoverageConfirmed(ctx, t.OrderID, coverage.PolicyNumber); err != nil {
        slog.Error("mark coverage confirmed", "order_id", t.OrderID, "error", err)
    }
}

const maxRetries = 20

func (s *InsurancePollScheduler) scheduleRetry(ctx context.Context, t InsuranceCoverageTask, reason string) {
    next := t.RetryCount + 1
    if next >= maxRetries {
        if err := s.tasks.MarkFailed(ctx, t.TaskID, reason); err != nil {
            slog.Error("mark task failed", "task_id", t.TaskID, "error", err)
        }
        slog.Error("insurance coverage task exhausted", "task_id", t.TaskID, "order_id", t.OrderID, "reason", reason)
        return
    }
    delay := time.Duration(min(1<<next, 300)) * time.Second
    if err := s.tasks.ScheduleRetry(ctx, t.TaskID, next, time.Now().Add(delay), reason); err != nil {
        slog.Error("schedule retry", "task_id", t.TaskID, "error", err)
    }
}

Что важно:

  • FindDue использует FOR UPDATE SKIP LOCKED LIMIT 50 — несколько pod'ов берут разные задачи без конфликтов.
  • Exponential backoff с верхней границей 300s. После 20 попыток — FAILED + структурный лог уровня ERROR.
  • Вызов insurance.GetCoverageStatus обёрнут в context.WithTimeout — per-call deadline независим от цикла scheduler'а.

4. Реализация FindDue через sqlc/pgx

-- query.sql (sqlc)
-- name: FindDueCoverageTasks :many
SELECT * FROM insurance_coverage_task
WHERE status = 'IN_PROGRESS'
  AND next_attempt_at <= NOW()
ORDER BY next_attempt_at
LIMIT $1
FOR UPDATE SKIP LOCKED;
// adapters/out/pg/insurance_task_repository.go
func (r *InsuranceCoverageTaskRepo) FindDue(ctx context.Context, limit int) ([]InsuranceCoverageTask, error) {
    rows, err := r.q.FindDueCoverageTasks(ctx, int32(limit))
    if err != nil {
        return nil, fmt.Errorf("find due coverage tasks: %w", err)
    }
    return mapTasks(rows), nil
}

time.Sleep допустим только до 2 секунд

R-RES-ASYNC-2: time.Sleep в адаптере допустим только если суммарное ожидание меньше 2 секунд — это граница in-memory transient retry.

// adapters/out/sber/sber_adapter.go
// ОК — один короткий backoff внутри адаптера, суммарно < 2s
func (a *SberAdapter) GetPaymentStatusWithWait(ctx context.Context, ref PaymentRef) (PaymentStatus, error) {
    status, err := a.doGetStatus(ctx, ref)
    if err != nil {
        return PaymentStatus{}, err
    }
    if status.Pending {
        time.Sleep(500 * time.Millisecond)   // разовый 500ms backoff — в пределах 2s
        return a.doGetStatus(ctx, ref)
    }
    return status, nil
}

Если нужно ждать дольше — это уже task-queue. Граница жёсткая: time.Sleep(2*time.Second) в адаптере уже запах; time.Sleep(5*time.Second) — нарушение R-RES-ASYNC-X2.

context.WithTimeout для async outbound через goroutine

R-RES-ASYNC-3: если вызов уходит в goroutine (fan-out, fire-and-forget с результатом через channel) — context.WithTimeout обязателен. gobreaker и retry.Do не покрывают timeout горутины, вышедшей за пределы их выполнения.

// adapters/out/product/product_adapter.go
// async fan-out: проверяем доступность нескольких складов параллельно
func (a *ProductAdapter) CheckWarehouseAvailabilityAll(
    ctx context.Context,
    productID ProductID,
    warehouseIDs []WarehouseID,
) ([]WarehouseAvailability, error) {
    callCtx, cancel := context.WithTimeout(ctx, 8*time.Second)  // R-RES-ASYNC-3
    defer cancel()

    type result struct {
        av  WarehouseAvailability
        err error
    }
    ch := make(chan result, len(warehouseIDs))

    for _, wid := range warehouseIDs {
        wid := wid
        go func() {
            av, err := a.doCheckWarehouse(callCtx, productID, wid)
            ch <- result{av, err}
        }()
    }

    out := make([]WarehouseAvailability, 0, len(warehouseIDs))
    for range warehouseIDs {
        r := <-ch
        if r.err != nil {
            if errors.Is(r.err, context.DeadlineExceeded) {
                return nil, &ProductSystemUnavailableError{System: "product-catalog", Cause: r.err}
            }
            return nil, fmt.Errorf("warehouse check: %w", r.err)
        }
        out = append(out, r.av)
    }
    return out, nil
}

context.WithTimeout на уровне fan-out гарантирует, что все горутины завершатся в пределах 8 секунд — независимо от того, что происходит внутри doCheckWarehouse.

Что запрещено

АнтипаттернПравилоЧто взамен
time.Sleep-цикл в HTTP-хендлере или goroutine хендлераR-RES-ASYNC-X1Task-queue + scheduler с time.NewTicker
time.Sleep(d) при d > 5sR-RES-ASYNC-X2Task-queue
Goroutine-outbound без context.WithTimeoutR-RES-ASYNC-3callCtx, cancel := context.WithTimeout(ctx, d)
Polling-task без FOR UPDATE SKIP LOCKEDR-RES-ASYNC-1SKIP LOCKED для параллельных pod'ов
Возврат 200 OK за queued-операциюR-RES-ASYNC-1202 Accepted с Location
Polling без ограничения по числу попытокR-RES-ASYNC-1После N attempts → FAILED + alert

Куда дальше

  • Retry — граница in-memory retry.Do vs task-queue, RetryIf по типу ошибки.
  • Где какая защита — schedulers/outbox-relay через task-queue, не gobreaker.
  • Fallback — fallback с 202 Accepted и кешем при недоступной системе.
  • Bulkhead — semaphore.NewWeighted для параллельного ограничения вызовов.
  • Circuit breaker — gobreaker: open-state при длинном polling.
  • Timeouts — context.WithTimeout иерархия connect/read/call.
  • Health checks — probe с TTL-кешем, не business-call.
  • Observability — метрики task-queue через promauto, OTel-span на scheduler.
  • Per-system isolation — отдельный *http.Client на страховую систему.
  • Configuration — envconfig-теги для ticker-интервала и maxRetries.
  • OpenAPI generator binding — oapi-codegen для клиента страховой API.