Опирается на правила:
R-RES-ASYNC-1…R-RES-ASYNC-3иR-RES-ASYNC-X1…R-RES-ASYNC-X2из Resilience Style Guide → раздел 11. Async и polling.
Важно знать
- Polling внешней системы — через task-queue в БД, не
asyncio.sleep-цикл в handler.- Handler создаёт
<X>PollingTask(status=IN_PROGRESS,next_attempt_at=now()+5s), возвращает 202 Accepted.- Scheduler (
asyncio-loop или APScheduler каждые 5s,FOR UPDATE SKIP LOCKED) дёргает внешнюю систему, обновляет статус.- При успехе —
status=COMPLETED, продолжение бизнес-флоу через event/saga.- В методе адаптера
asyncio.sleepдопустим только если total wait<2s(короткий фиксированный backoff).- Для async outbound —
asyncio.timeout()/asyncio.wait_forобязателен (R-RES-ASYNC-3).asyncio.sleep-цикл в handler — главный антипаттерн: держит ASGI-таску, исчерпывает воркеры event-loop.asyncio.sleep > 5s— запах «должно быть task-queue».
Когда внешняя система не отвечает мгновенно — страховая асинхронно подтверждает полис, платёжный провайдер требует polling статуса по payment_id — соблазн велик: написать while True: await asyncio.sleep(3); status = await adapter.get_status(...). В sync-handler это убивает воркеры event-loop. Task-queue — стандартный способ сделать это правильно.
Polling через task-queue
R-RES-ASYNC-1: классическая структура для сценария с polling.
1. Схема task-таблицы
CREATE TABLE order_confirmation_task (
task_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
order_id BIGINT NOT NULL,
external_id TEXT,
status TEXT NOT NULL, -- PENDING / IN_PROGRESS / COMPLETED / FAILED
retry_count INTEGER NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMPTZ NOT NULL,
last_error TEXT,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX ix_oct_due ON order_confirmation_task (status, next_attempt_at)
WHERE status IN ('PENDING', 'IN_PROGRESS');
WHERE в partial-индексе — COMPLETED и FAILED не участвуют в сканировании due-строк.
2. Use Case handler ставит задачу
# application/use_cases/create_order.py
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass
@dataclass(frozen=True)
class CreateOrderCommand:
customer_id: int
items: list[OrderItem]
@dataclass(frozen=True)
class CreateOrderResult:
order_id: int
task_id: int
status: str = "queued"
class CreateOrderHandler:
def __init__(
self,
order_repo: OrderRepository,
task_repo: OrderConfirmationTaskRepository,
) -> None:
self._orders = order_repo
self._tasks = task_repo
async def handle(self, cmd: CreateOrderCommand) -> CreateOrderResult:
order = Order.create(customer_id=cmd.customer_id, items=cmd.items)
await self._orders.save(order)
task_id = await self._tasks.enqueue(
OrderConfirmationTask(
order_id=order.id,
next_attempt_at=datetime.now(timezone.utc),
payload=cmd,
)
)
return CreateOrderResult(order_id=order.id, task_id=task_id)
FastAPI-роутер маппит CreateOrderResult в 202 Accepted с заголовком Location: /orders/{id}.
# adapters/in/http/order_router.py
from fastapi import APIRouter, Response
from starlette.status import HTTP_202_ACCEPTED
router = APIRouter(prefix="/orders")
@router.post("", status_code=HTTP_202_ACCEPTED)
async def create_order(
body: CreateOrderRequest,
handler: CreateOrderHandler = Depends(get_create_order_handler),
) -> dict:
result = await handler.handle(body.to_command())
return {
"status": result.status,
"order_id": result.order_id,
"task_id": result.task_id,
}
3. Scheduler опрашивает due-задачи
# application/schedulers/order_confirmation_scheduler.py
import asyncio
import math
from datetime import datetime, timezone, timedelta
class OrderConfirmationScheduler:
def __init__(
self,
task_repo: OrderConfirmationTaskRepository,
payment_port: PaymentPort,
event_publisher: DomainEventPublisher,
) -> None:
self._tasks = task_repo
self._payment = payment_port
self._events = event_publisher
async def run_due(self) -> None:
due = await self._tasks.find_due_for_processing(limit=50)
for task in due:
try:
status = await self._payment.get_status(task.order_id)
if status == OrderStatus.CONFIRMED:
await self._tasks.mark_completed(task.task_id)
await self._events.publish(OrderConfirmedEvent(task.order_id))
else:
await self._schedule_retry(task, "still pending")
except Exception as exc:
await self._schedule_retry(task, str(exc))
async def _schedule_retry(self, task: OrderConfirmationTask, reason: str) -> None:
next_count = task.retry_count + 1
if next_count >= 20:
await self._tasks.mark_failed(task.task_id, reason)
return
backoff = timedelta(seconds=min(2 ** next_count, 300))
await self._tasks.schedule_retry(
task.task_id,
next_count,
datetime.now(timezone.utc) + backoff,
reason,
)
Запуск через APScheduler (или asyncio-loop с asyncio.sleep(5) между прогонами):
# infrastructure/scheduler_runner.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
def register_schedulers(app: FastAPI, scheduler: OrderConfirmationScheduler) -> None:
aps = AsyncIOScheduler()
aps.add_job(scheduler.run_due, "interval", seconds=5, max_instances=1)
app.on_event("startup")(aps.start)
app.on_event("shutdown")(aps.shutdown)
Что важно:
find_due_for_processingиспользуетFOR UPDATE SKIP LOCKED LIMIT 50— несколько pod'ов берут разные задачи без коллизий.- Exponential backoff с ограничением сверху (300s). После 20 попыток —
FAILED+ alert. - При успехе публикуется domain event, бизнес-флоу продолжается асинхронно.
4. Repository с FOR UPDATE SKIP LOCKED
# adapters/out/db/order_confirmation_task_repository.py
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
class SqlOrderConfirmationTaskRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def find_due_for_processing(self, limit: int) -> list[OrderConfirmationTask]:
result = await self._session.execute(
text("""
SELECT task_id, order_id, external_id, retry_count, payload
FROM order_confirmation_task
WHERE status IN ('PENDING', 'IN_PROGRESS')
AND next_attempt_at <= NOW()
ORDER BY next_attempt_at
LIMIT :limit
FOR UPDATE SKIP LOCKED
"""),
{"limit": limit},
)
return [_row_to_task(row) for row in result.mappings()]
asyncio.sleep допустим только до 2 секунд
R-RES-ASYNC-2: asyncio.sleep в методе адаптера допустим только если общее ожидание меньше 2 секунд.
# adapters/out/sber/sber_adapter.py — ДОПУСТИМО
async def get_status_with_short_wait(self, order_id: int) -> OrderStatus:
status = await self._do_get_status(order_id)
if status == OrderStatus.PENDING:
await asyncio.sleep(0.5) # 500ms — в пределах transient retry
status = await self._do_get_status(order_id)
return status
Это в пределах обычного HTTP-вызова. Если ожидание больше — переходим на task-queue.
asyncio.timeout для async outbound
R-RES-ASYNC-3: для всех async outbound-вызовов asyncio.timeout() обязателен — retry, bulkhead, CB сами по себе не отменяют зависшую async-таску.
# adapters/out/sber/sber_adapter.py
async def register(self, order: Order) -> PaymentRef:
async with self._sem: # bulkhead (R-RES-BH)
try:
async with self._breaker: # CB (purgatory/aiobreaker)
async with asyncio.timeout(self._settings.total_timeout):
resp = await self._client.post(
"/register",
json=to_sber_request(order),
)
resp.raise_for_status()
return to_domain(resp.json())
except CircuitBreakerError as exc:
raise PaymentPortError.system_unavailable("sber") from exc
except TimeoutError as exc:
raise PaymentPortError.timeout("sber") from exc
asyncio.timeout оборачивает весь вызов целиком, включая CB и semaphore-acquire. Если внешняя система зависла — таска завершается по timeout гарантированно, а не ждёт бесконечно.
Пример pydantic-settings для timeout-конфига (R-RES-CFG-1):
# infrastructure/config/sber_settings.py
from pydantic_settings import BaseSettings
class SberClientSettings(BaseSettings):
connect_timeout: float = 1.0
read_timeout: float = 10.0
write_timeout: float = 5.0
pool_timeout: float = 2.0
total_timeout: float = 15.0 # > read_timeout, < 60s
model_config = {"env_prefix": "CLIENT_SBER__"}
Что запрещено
asyncio.sleep-цикл в handler
R-RES-ASYNC-X1: классический «подождём пока обработается».
# ПЛОХО — sleep-loop в handler
@router.post("/orders/{order_id}/confirm")
async def confirm_order(order_id: int, payment_port: PaymentPort = Depends(...)):
await payment_port.confirm(order_id)
for _ in range(30): # 30 × 1s = 30 секунд
status = await payment_port.get_status(order_id)
if status == OrderStatus.CONFIRMED:
return {"status": "confirmed"}
await asyncio.sleep(1) # ← держит ASGI-таску 30 секунд
raise HTTPException(status_code=504, detail="confirmation timeout")
Что не так:
- ASGI-таска заблокирована 30 секунд. При 50 одновременных запросах event-loop перегружен — новые запросы ждут.
- При рестарте — все «ждущие» таски обрываются, операции теряются.
- Upstream-timeout клиента (браузер, мобильное приложение) скорее всего меньше 30s.
Корректно: handler ставит polling-task в БД, возвращает 202 Accepted, scheduler опрашивает.
asyncio.sleep > 5 секунд
R-RES-ASYNC-X2: любой asyncio.sleep больше 5 секунд — запах task-queue.
# ПЛОХО — длинный sleep в адаптере
async def poll_until_ready(self, policy_id: str) -> PolicyStatus:
status = await self._do_get_status(policy_id)
if status == PolicyStatus.PENDING:
await asyncio.sleep(15) # ← 15 секунд!
status = await self._do_get_status(policy_id)
return status
Это — task-queue по форме, но в виде async-кода. Переписать:
- Поставить
InsurancePollingTaskв БД. - Вернуть
202 Accepted. - Scheduler через ~15 секунд вызовет
get_status.
При масштабировании async-вариант ведёт себя предсказуемо хуже task-queue: каждый asyncio.sleep(15) удерживает таску, event-loop не свободен для других запросов.
Что запрещено — таблица
| Антипаттерн | Правило | Что взамен |
|---|---|---|
asyncio.sleep(N) в цикле в handler | R-RES-ASYNC-X1 | Task-queue с polling-scheduler |
Любой asyncio.sleep > 5s | R-RES-ASYNC-X2 | Task-queue |
Async outbound без asyncio.timeout() | R-RES-ASYNC-3 | asyncio.timeout(total_timeout) вокруг вызова |
Polling-task без FOR UPDATE SKIP LOCKED | R-RES-ASYNC-1 | SKIP LOCKED для параллельных pod'ов |
Возврат 200 OK за queued-операцию | R-RES-ASYNC-1 | 202 Accepted с task_id |
| Polling без max-retry-count | R-RES-ASYNC-1 | После N попыток → FAILED + alert |
Куда дальше
- Retry — граница in-memory retry vs task-queue;
tenacityexponential backoff. - Где какая защита — schedulers через task-queue, не in-memory обёртки.
- Fallback — fallback с
202 Accepted+ task-queue. - Timeouts — иерархия
connect < read < total;httpx.Timeout+asyncio.timeout. - Circuit Breaker — open-state и как
asyncio.timeoutдополняет CB. - Bulkhead —
asyncio.Semaphoreper-system, не executor-пул. - Observability — метрики задач (
PENDING/FAILED/retry_count) черезprometheus-client. - Health Checks — probe с TTL-кешем, не бизнес-вызов.
- Configuration —
pydantic-settingsдля timeout/bulkhead per-system. - Per-system isolation — отдельный
AsyncClient+ CB + semaphore на систему. - OpenAPI generator binding — обёртки на адаптере, не на сгенерированном клиенте.