Опирается на правила:
R-RES-ASYNC-1…R-RES-ASYNC-3иR-RES-ASYNC-X1…R-RES-ASYNC-X2из Resilience Style Guide → раздел 11. Async и polling.
Важно знать
- Polling внешней системы (страхование, фискализация) — через task-queue в БД, не через
time.Sleep-цикл в горутине HTTP-хендлера.- Хендлер создаёт запись в таблице
*_task(status=IN_PROGRESS,next_attempt_at=now()+5s) и возвращает 202 Accepted.- Scheduler (
time.NewTicker, каждые 5s,FOR UPDATE SKIP LOCKED) опрашивает внешнюю систему и обновляет статус задачи.- При успехе —
status=COMPLETED, бизнес-флоу продолжается через event или прямое обновление агрегата.time.Sleepв адаптере допустим только если суммарное ожидание<2s(короткий transient backoff).- Для async outbound через goroutine —
context.WithTimeoutобязателен (R-RES-ASYNC-3); без него timeout не покрывается ниgobreaker, ниretry.Do.time.Sleep-цикл в HTTP-хендлере — главный антипаттерн: держит горутину на N × iterations секунд, при нагрузке исчерпывает worker-пул.time.Sleep(d)приd > 5s— запах «должно было быть task-queue».
Когда страховой партнёр принимает запрос и отвечает «ждите, мы проверяем» — первый импульс: подождать в цикле и переспросить. В HTTP-хендлере это блокирует горутину на время ожидания. Task-queue делает то же самое, но вне хендлера, с durable-гарантией и горизонтальным масштабированием.
Polling через task-queue
R-RES-ASYNC-1: классическая структура для сценария «отправили → ждём результат».
1. Схема task-таблицы
CREATE TABLE insurance_coverage_task (
task_id BIGSERIAL PRIMARY KEY,
order_id BIGINT NOT NULL,
external_ref TEXT,
status TEXT NOT NULL, -- IN_PROGRESS / COMPLETED / FAILED
retry_count INT NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMPTZ NOT NULL,
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX ix_ict_due ON insurance_coverage_task (status, next_attempt_at)
WHERE status = 'IN_PROGRESS';
Partial index по status = 'IN_PROGRESS' исключает завершённые строки из рабочего скана.
2. Хендлер ставит задачу и возвращает 202
// core/usecase/create_order_handler.go
type CreateOrderHandler struct {
orders OrderRepository
tasks InsuranceCoverageTaskRepository
}
func (h *CreateOrderHandler) Handle(ctx context.Context, cmd CreateOrderCommand) (CreateOrderResult, error) {
order, err := NewOrder(cmd.CustomerID, cmd.Items)
if err != nil {
return CreateOrderResult{}, fmt.Errorf("create order: %w", err)
}
if err := h.orders.Save(ctx, order); err != nil {
return CreateOrderResult{}, fmt.Errorf("save order: %w", err)
}
taskID, err := h.tasks.Enqueue(ctx, InsuranceCoverageTask{
OrderID: order.ID,
Status: TaskStatusInProgress,
NextAttemptAt: time.Now(),
})
if err != nil {
return CreateOrderResult{}, fmt.Errorf("enqueue coverage task: %w", err)
}
return CreateOrderResult{OrderID: order.ID, TaskID: taskID, Queued: true}, nil
}
Контроллер (chi) маппит result.Queued == true в 202 Accepted с Location: /orders/{id}:
// adapters/in/http/order_handler.go
func (h *OrderHandler) CreateOrder(w http.ResponseWriter, r *http.Request) {
var cmd CreateOrderCommand
if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
result, err := h.handler.Handle(r.Context(), cmd)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if result.Queued {
w.Header().Set("Location", fmt.Sprintf("/orders/%d", result.OrderID))
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]any{
"status": "queued",
"order_id": result.OrderID,
"task_id": result.TaskID,
})
return
}
json.NewEncoder(w).Encode(result)
}
3. Scheduler опрашивает due-tasks
// scheduler/insurance_poll_scheduler.go
type InsurancePollScheduler struct {
tasks InsuranceCoverageTaskRepository
insurance InsurancePort
orders OrderRepository
}
func (s *InsurancePollScheduler) Run(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.processDue(ctx)
}
}
}
func (s *InsurancePollScheduler) processDue(ctx context.Context) {
tasks, err := s.tasks.FindDue(ctx, 50) // FOR UPDATE SKIP LOCKED LIMIT 50
if err != nil {
slog.Error("insurance poll: find due tasks", "error", err)
return
}
for _, t := range tasks {
s.process(ctx, t)
}
}
func (s *InsurancePollScheduler) process(ctx context.Context, t InsuranceCoverageTask) {
callCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
coverage, err := s.insurance.GetCoverageStatus(callCtx, t.ExternalRef)
if err != nil {
s.scheduleRetry(ctx, t, err.Error())
return
}
if !coverage.Confirmed {
s.scheduleRetry(ctx, t, "still pending")
return
}
if err := s.tasks.MarkCompleted(ctx, t.TaskID); err != nil {
slog.Error("mark task completed", "task_id", t.TaskID, "error", err)
return
}
if err := s.orders.MarkCoverageConfirmed(ctx, t.OrderID, coverage.PolicyNumber); err != nil {
slog.Error("mark coverage confirmed", "order_id", t.OrderID, "error", err)
}
}
const maxRetries = 20
func (s *InsurancePollScheduler) scheduleRetry(ctx context.Context, t InsuranceCoverageTask, reason string) {
next := t.RetryCount + 1
if next >= maxRetries {
if err := s.tasks.MarkFailed(ctx, t.TaskID, reason); err != nil {
slog.Error("mark task failed", "task_id", t.TaskID, "error", err)
}
slog.Error("insurance coverage task exhausted", "task_id", t.TaskID, "order_id", t.OrderID, "reason", reason)
return
}
delay := time.Duration(min(1<<next, 300)) * time.Second
if err := s.tasks.ScheduleRetry(ctx, t.TaskID, next, time.Now().Add(delay), reason); err != nil {
slog.Error("schedule retry", "task_id", t.TaskID, "error", err)
}
}
Что важно:
FindDueиспользуетFOR UPDATE SKIP LOCKED LIMIT 50— несколько pod'ов берут разные задачи без конфликтов.- Exponential backoff с верхней границей 300s. После 20 попыток —
FAILED+ структурный лог уровня ERROR. - Вызов
insurance.GetCoverageStatusобёрнут вcontext.WithTimeout— per-call deadline независим от цикла scheduler'а.
4. Реализация FindDue через sqlc/pgx
-- query.sql (sqlc)
-- name: FindDueCoverageTasks :many
SELECT * FROM insurance_coverage_task
WHERE status = 'IN_PROGRESS'
AND next_attempt_at <= NOW()
ORDER BY next_attempt_at
LIMIT $1
FOR UPDATE SKIP LOCKED;
// adapters/out/pg/insurance_task_repository.go
func (r *InsuranceCoverageTaskRepo) FindDue(ctx context.Context, limit int) ([]InsuranceCoverageTask, error) {
rows, err := r.q.FindDueCoverageTasks(ctx, int32(limit))
if err != nil {
return nil, fmt.Errorf("find due coverage tasks: %w", err)
}
return mapTasks(rows), nil
}
time.Sleep допустим только до 2 секунд
R-RES-ASYNC-2: time.Sleep в адаптере допустим только если суммарное ожидание меньше 2 секунд — это граница in-memory transient retry.
// adapters/out/sber/sber_adapter.go
// ОК — один короткий backoff внутри адаптера, суммарно < 2s
func (a *SberAdapter) GetPaymentStatusWithWait(ctx context.Context, ref PaymentRef) (PaymentStatus, error) {
status, err := a.doGetStatus(ctx, ref)
if err != nil {
return PaymentStatus{}, err
}
if status.Pending {
time.Sleep(500 * time.Millisecond) // разовый 500ms backoff — в пределах 2s
return a.doGetStatus(ctx, ref)
}
return status, nil
}
Если нужно ждать дольше — это уже task-queue. Граница жёсткая: time.Sleep(2*time.Second) в адаптере уже запах; time.Sleep(5*time.Second) — нарушение R-RES-ASYNC-X2.
context.WithTimeout для async outbound через goroutine
R-RES-ASYNC-3: если вызов уходит в goroutine (fan-out, fire-and-forget с результатом через channel) — context.WithTimeout обязателен. gobreaker и retry.Do не покрывают timeout горутины, вышедшей за пределы их выполнения.
// adapters/out/product/product_adapter.go
// async fan-out: проверяем доступность нескольких складов параллельно
func (a *ProductAdapter) CheckWarehouseAvailabilityAll(
ctx context.Context,
productID ProductID,
warehouseIDs []WarehouseID,
) ([]WarehouseAvailability, error) {
callCtx, cancel := context.WithTimeout(ctx, 8*time.Second) // R-RES-ASYNC-3
defer cancel()
type result struct {
av WarehouseAvailability
err error
}
ch := make(chan result, len(warehouseIDs))
for _, wid := range warehouseIDs {
wid := wid
go func() {
av, err := a.doCheckWarehouse(callCtx, productID, wid)
ch <- result{av, err}
}()
}
out := make([]WarehouseAvailability, 0, len(warehouseIDs))
for range warehouseIDs {
r := <-ch
if r.err != nil {
if errors.Is(r.err, context.DeadlineExceeded) {
return nil, &ProductSystemUnavailableError{System: "product-catalog", Cause: r.err}
}
return nil, fmt.Errorf("warehouse check: %w", r.err)
}
out = append(out, r.av)
}
return out, nil
}
context.WithTimeout на уровне fan-out гарантирует, что все горутины завершатся в пределах 8 секунд — независимо от того, что происходит внутри doCheckWarehouse.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
time.Sleep-цикл в HTTP-хендлере или goroutine хендлера | R-RES-ASYNC-X1 | Task-queue + scheduler с time.NewTicker |
time.Sleep(d) при d > 5s | R-RES-ASYNC-X2 | Task-queue |
Goroutine-outbound без context.WithTimeout | R-RES-ASYNC-3 | callCtx, cancel := context.WithTimeout(ctx, d) |
Polling-task без FOR UPDATE SKIP LOCKED | R-RES-ASYNC-1 | SKIP LOCKED для параллельных pod'ов |
Возврат 200 OK за queued-операцию | R-RES-ASYNC-1 | 202 Accepted с Location |
| Polling без ограничения по числу попыток | R-RES-ASYNC-1 | После N attempts → FAILED + alert |
Куда дальше
- Retry — граница in-memory
retry.Dovs task-queue,RetryIfпо типу ошибки. - Где какая защита — schedulers/outbox-relay через task-queue, не
gobreaker. - Fallback — fallback с 202 Accepted и кешем при недоступной системе.
- Bulkhead —
semaphore.NewWeightedдля параллельного ограничения вызовов. - Circuit breaker —
gobreaker: open-state при длинном polling. - Timeouts —
context.WithTimeoutиерархия connect/read/call. - Health checks — probe с TTL-кешем, не business-call.
- Observability — метрики task-queue через
promauto, OTel-span на scheduler. - Per-system isolation — отдельный
*http.Clientна страховую систему. - Configuration —
envconfig-теги дляticker-интервала иmaxRetries. - OpenAPI generator binding —
oapi-codegenдля клиента страховой API.