Опирается на правила: R-RES-ASYNC-1R-RES-ASYNC-3 и R-RES-ASYNC-X1R-RES-ASYNC-X2 из Resilience Style Guide → раздел 11. Async и polling.

Важно знать

  • Polling внешней системы — через task-queue в БД, не asyncio.sleep-цикл в handler.
  • Handler создаёт <X>PollingTask (status=IN_PROGRESS, next_attempt_at=now()+5s), возвращает 202 Accepted.
  • Scheduler (asyncio-loop или APScheduler каждые 5s, FOR UPDATE SKIP LOCKED) дёргает внешнюю систему, обновляет статус.
  • При успехе — status=COMPLETED, продолжение бизнес-флоу через event/saga.
  • В методе адаптера asyncio.sleep допустим только если total wait <2s (короткий фиксированный backoff).
  • Для async outbound — asyncio.timeout() / asyncio.wait_for обязателен (R-RES-ASYNC-3).
  • asyncio.sleep-цикл в handler — главный антипаттерн: держит ASGI-таску, исчерпывает воркеры event-loop.
  • asyncio.sleep > 5s — запах «должно быть task-queue».

Когда внешняя система не отвечает мгновенно — страховая асинхронно подтверждает полис, платёжный провайдер требует polling статуса по payment_id — соблазн велик: написать while True: await asyncio.sleep(3); status = await adapter.get_status(...). В sync-handler это убивает воркеры event-loop. Task-queue — стандартный способ сделать это правильно.

Polling через task-queue

R-RES-ASYNC-1: классическая структура для сценария с polling.

1. Схема task-таблицы

CREATE TABLE order_confirmation_task (
    task_id          BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    order_id         BIGINT NOT NULL,
    external_id      TEXT,
    status           TEXT NOT NULL,          -- PENDING / IN_PROGRESS / COMPLETED / FAILED
    retry_count      INTEGER NOT NULL DEFAULT 0,
    next_attempt_at  TIMESTAMPTZ NOT NULL,
    last_error       TEXT,
    payload          JSONB NOT NULL,
    created_at       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at       TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX ix_oct_due ON order_confirmation_task (status, next_attempt_at)
    WHERE status IN ('PENDING', 'IN_PROGRESS');

WHERE в partial-индексе — COMPLETED и FAILED не участвуют в сканировании due-строк.

2. Use Case handler ставит задачу

# application/use_cases/create_order.py
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass

@dataclass(frozen=True)
class CreateOrderCommand:
    customer_id: int
    items: list[OrderItem]

@dataclass(frozen=True)
class CreateOrderResult:
    order_id: int
    task_id: int
    status: str = "queued"

class CreateOrderHandler:
    def __init__(
        self,
        order_repo: OrderRepository,
        task_repo: OrderConfirmationTaskRepository,
    ) -> None:
        self._orders = order_repo
        self._tasks = task_repo

    async def handle(self, cmd: CreateOrderCommand) -> CreateOrderResult:
        order = Order.create(customer_id=cmd.customer_id, items=cmd.items)
        await self._orders.save(order)

        task_id = await self._tasks.enqueue(
            OrderConfirmationTask(
                order_id=order.id,
                next_attempt_at=datetime.now(timezone.utc),
                payload=cmd,
            )
        )

        return CreateOrderResult(order_id=order.id, task_id=task_id)

FastAPI-роутер маппит CreateOrderResult в 202 Accepted с заголовком Location: /orders/{id}.

# adapters/in/http/order_router.py
from fastapi import APIRouter, Response
from starlette.status import HTTP_202_ACCEPTED

router = APIRouter(prefix="/orders")

@router.post("", status_code=HTTP_202_ACCEPTED)
async def create_order(
    body: CreateOrderRequest,
    handler: CreateOrderHandler = Depends(get_create_order_handler),
) -> dict:
    result = await handler.handle(body.to_command())
    return {
        "status": result.status,
        "order_id": result.order_id,
        "task_id": result.task_id,
    }

3. Scheduler опрашивает due-задачи

# application/schedulers/order_confirmation_scheduler.py
import asyncio
import math
from datetime import datetime, timezone, timedelta

class OrderConfirmationScheduler:
    def __init__(
        self,
        task_repo: OrderConfirmationTaskRepository,
        payment_port: PaymentPort,
        event_publisher: DomainEventPublisher,
    ) -> None:
        self._tasks = task_repo
        self._payment = payment_port
        self._events = event_publisher

    async def run_due(self) -> None:
        due = await self._tasks.find_due_for_processing(limit=50)
        for task in due:
            try:
                status = await self._payment.get_status(task.order_id)
                if status == OrderStatus.CONFIRMED:
                    await self._tasks.mark_completed(task.task_id)
                    await self._events.publish(OrderConfirmedEvent(task.order_id))
                else:
                    await self._schedule_retry(task, "still pending")
            except Exception as exc:
                await self._schedule_retry(task, str(exc))

    async def _schedule_retry(self, task: OrderConfirmationTask, reason: str) -> None:
        next_count = task.retry_count + 1
        if next_count >= 20:
            await self._tasks.mark_failed(task.task_id, reason)
            return
        backoff = timedelta(seconds=min(2 ** next_count, 300))
        await self._tasks.schedule_retry(
            task.task_id,
            next_count,
            datetime.now(timezone.utc) + backoff,
            reason,
        )

Запуск через APScheduler (или asyncio-loop с asyncio.sleep(5) между прогонами):

# infrastructure/scheduler_runner.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler

def register_schedulers(app: FastAPI, scheduler: OrderConfirmationScheduler) -> None:
    aps = AsyncIOScheduler()
    aps.add_job(scheduler.run_due, "interval", seconds=5, max_instances=1)
    app.on_event("startup")(aps.start)
    app.on_event("shutdown")(aps.shutdown)

Что важно:

  • find_due_for_processing использует FOR UPDATE SKIP LOCKED LIMIT 50 — несколько pod'ов берут разные задачи без коллизий.
  • Exponential backoff с ограничением сверху (300s). После 20 попыток — FAILED + alert.
  • При успехе публикуется domain event, бизнес-флоу продолжается асинхронно.

4. Repository с FOR UPDATE SKIP LOCKED

# adapters/out/db/order_confirmation_task_repository.py
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

class SqlOrderConfirmationTaskRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def find_due_for_processing(self, limit: int) -> list[OrderConfirmationTask]:
        result = await self._session.execute(
            text("""
                SELECT task_id, order_id, external_id, retry_count, payload
                FROM order_confirmation_task
                WHERE status IN ('PENDING', 'IN_PROGRESS')
                  AND next_attempt_at <= NOW()
                ORDER BY next_attempt_at
                LIMIT :limit
                FOR UPDATE SKIP LOCKED
            """),
            {"limit": limit},
        )
        return [_row_to_task(row) for row in result.mappings()]

asyncio.sleep допустим только до 2 секунд

R-RES-ASYNC-2: asyncio.sleep в методе адаптера допустим только если общее ожидание меньше 2 секунд.

# adapters/out/sber/sber_adapter.py — ДОПУСТИМО
async def get_status_with_short_wait(self, order_id: int) -> OrderStatus:
    status = await self._do_get_status(order_id)
    if status == OrderStatus.PENDING:
        await asyncio.sleep(0.5)          # 500ms — в пределах transient retry
        status = await self._do_get_status(order_id)
    return status

Это в пределах обычного HTTP-вызова. Если ожидание больше — переходим на task-queue.

asyncio.timeout для async outbound

R-RES-ASYNC-3: для всех async outbound-вызовов asyncio.timeout() обязателен — retry, bulkhead, CB сами по себе не отменяют зависшую async-таску.

# adapters/out/sber/sber_adapter.py
async def register(self, order: Order) -> PaymentRef:
    async with self._sem:                               # bulkhead (R-RES-BH)
        try:
            async with self._breaker:                   # CB (purgatory/aiobreaker)
                async with asyncio.timeout(self._settings.total_timeout):
                    resp = await self._client.post(
                        "/register",
                        json=to_sber_request(order),
                    )
                    resp.raise_for_status()
                    return to_domain(resp.json())
        except CircuitBreakerError as exc:
            raise PaymentPortError.system_unavailable("sber") from exc
        except TimeoutError as exc:
            raise PaymentPortError.timeout("sber") from exc

asyncio.timeout оборачивает весь вызов целиком, включая CB и semaphore-acquire. Если внешняя система зависла — таска завершается по timeout гарантированно, а не ждёт бесконечно.

Пример pydantic-settings для timeout-конфига (R-RES-CFG-1):

# infrastructure/config/sber_settings.py
from pydantic_settings import BaseSettings

class SberClientSettings(BaseSettings):
    connect_timeout: float = 1.0
    read_timeout: float = 10.0
    write_timeout: float = 5.0
    pool_timeout: float = 2.0
    total_timeout: float = 15.0     # > read_timeout, < 60s

    model_config = {"env_prefix": "CLIENT_SBER__"}

Что запрещено

asyncio.sleep-цикл в handler

R-RES-ASYNC-X1: классический «подождём пока обработается».

# ПЛОХО — sleep-loop в handler
@router.post("/orders/{order_id}/confirm")
async def confirm_order(order_id: int, payment_port: PaymentPort = Depends(...)):
    await payment_port.confirm(order_id)
    for _ in range(30):                         # 30 × 1s = 30 секунд
        status = await payment_port.get_status(order_id)
        if status == OrderStatus.CONFIRMED:
            return {"status": "confirmed"}
        await asyncio.sleep(1)                  # ← держит ASGI-таску 30 секунд
    raise HTTPException(status_code=504, detail="confirmation timeout")

Что не так:

  • ASGI-таска заблокирована 30 секунд. При 50 одновременных запросах event-loop перегружен — новые запросы ждут.
  • При рестарте — все «ждущие» таски обрываются, операции теряются.
  • Upstream-timeout клиента (браузер, мобильное приложение) скорее всего меньше 30s.

Корректно: handler ставит polling-task в БД, возвращает 202 Accepted, scheduler опрашивает.

asyncio.sleep > 5 секунд

R-RES-ASYNC-X2: любой asyncio.sleep больше 5 секунд — запах task-queue.

# ПЛОХО — длинный sleep в адаптере
async def poll_until_ready(self, policy_id: str) -> PolicyStatus:
    status = await self._do_get_status(policy_id)
    if status == PolicyStatus.PENDING:
        await asyncio.sleep(15)             # ← 15 секунд!
        status = await self._do_get_status(policy_id)
    return status

Это — task-queue по форме, но в виде async-кода. Переписать:

  1. Поставить InsurancePollingTask в БД.
  2. Вернуть 202 Accepted.
  3. Scheduler через ~15 секунд вызовет get_status.

При масштабировании async-вариант ведёт себя предсказуемо хуже task-queue: каждый asyncio.sleep(15) удерживает таску, event-loop не свободен для других запросов.

Что запрещено — таблица

АнтипаттернПравилоЧто взамен
asyncio.sleep(N) в цикле в handlerR-RES-ASYNC-X1Task-queue с polling-scheduler
Любой asyncio.sleep > 5sR-RES-ASYNC-X2Task-queue
Async outbound без asyncio.timeout()R-RES-ASYNC-3asyncio.timeout(total_timeout) вокруг вызова
Polling-task без FOR UPDATE SKIP LOCKEDR-RES-ASYNC-1SKIP LOCKED для параллельных pod'ов
Возврат 200 OK за queued-операциюR-RES-ASYNC-1202 Accepted с task_id
Polling без max-retry-countR-RES-ASYNC-1После N попыток → FAILED + alert

Куда дальше

  • Retry — граница in-memory retry vs task-queue; tenacity exponential backoff.
  • Где какая защита — schedulers через task-queue, не in-memory обёртки.
  • Fallback — fallback с 202 Accepted + task-queue.
  • Timeouts — иерархия connect < read < total; httpx.Timeout + asyncio.timeout.
  • Circuit Breaker — open-state и как asyncio.timeout дополняет CB.
  • Bulkhead — asyncio.Semaphore per-system, не executor-пул.
  • Observability — метрики задач (PENDING/FAILED/retry_count) через prometheus-client.
  • Health Checks — probe с TTL-кешем, не бизнес-вызов.
  • Configuration — pydantic-settings для timeout/bulkhead per-system.
  • Per-system isolation — отдельный AsyncClient + CB + semaphore на систему.
  • OpenAPI generator binding — обёртки на адаптере, не на сгенерированном клиенте.