Опирается на правила:
R-RES-WHERE-1…R-RES-WHERE-4иR-RES-WHERE-X1из Resilience Style Guide → раздел 1. Где какая защита.
Важно знать
- Outbound HTTP к внешним системам (Сбер, СМС-провайдер, страховщик) — полный набор:
httpx.Timeout+asyncio.timeout+ circuit breaker +asyncio.Semaphore+ опциональный@retry. Без CB первый «slow burn» внешней системы заблокирует все worker-таски.- Internal service-to-service —
asyncio.timeout+ circuit breaker. Semaphore — по необходимости.- Schedulers и outbox-relay — task-queue через таблицу БД (
status,retry_count,next_attempt_at), не in-memorytenacity. In-memory retry гибнет при рестарте процесса и не переживает отказы дольше нескольких секунд.- Inbound (наш FastAPI) — rate limit на API Gateway (Kong, Istio). В самом FastAPI — только в исключительных случаях, когда gateway недоступен.
- Локальный код (репозиторий, SQLAlchemy, in-memory) — без CB и retry. Транзиентов нет; любой сбой здесь — реальная ошибка, а не флуктуация среды.
- CB/retry/semaphore навешиваются на public-метод out-adapter, не на сгенерированный клиент и не на handler.
- Всегда использовать готовую библиотеку (
purgatory/aiobreaker/tenacity), не самописный счётчик наtry/except— самописный CB является источником труднодиагностируемых ошибок.
httpx + tenacity + circuit breaker — не «навесить везде». У каждой группы вызовов своя категория защиты, и неверное размещение вредит больше, чем отсутствие защиты.
Outbound HTTP к внешним системам — полный набор
R-RES-WHERE-1: любой вызов к внешней системе защищается полным набором: timeout, circuit breaker, bulkhead и (при идемпотентности) retry.
# adapters/out/sber/sber_adapter.py
import asyncio
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from purgatory import AsyncCircuitBreaker, CircuitBreakerError
from core.port.payment_port import PaymentPort
from core.domain.order import Order
from core.domain.payment_ref import PaymentRef
from adapters.out.sber.mapper import to_sber_request, to_domain
from adapters.out.sber.errors import PaymentPortError
class SberAdapter(PaymentPort):
def __init__(
self,
client: httpx.AsyncClient,
breaker: AsyncCircuitBreaker,
sem: asyncio.Semaphore,
) -> None:
self._client = client
self._breaker = breaker
self._sem = sem
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=0.5, min=0.5, max=4),
retry=retry_if_exception_type((httpx.TimeoutException, httpx.ConnectError)),
reraise=True,
)
async def get_payment_status(self, order_id: str) -> PaymentRef:
async with self._sem: # bulkhead (R-RES-BH-1)
try:
async with self._breaker: # CB (R-RES-CB-1)
async with asyncio.timeout(30): # time-limiter
resp = await self._client.get(f"/status/{order_id}")
resp.raise_for_status()
return to_domain(resp.json())
except CircuitBreakerError as exc:
raise PaymentPortError.system_unavailable("sber") from exc
async def register(self, order: Order) -> PaymentRef:
async with self._sem:
try:
async with self._breaker:
async with asyncio.timeout(30):
resp = await self._client.post(
"/register",
json=to_sber_request(order),
headers={"Idempotency-Key": str(order.order_id)},
)
resp.raise_for_status()
return to_domain(resp.json())
except CircuitBreakerError as exc:
raise PaymentPortError.system_unavailable("sber") from exc
Что делает каждый слой:
httpx.Timeout(connect=2, read=10, write=5, pool=1)— per-request лимиты на уровне HTTP-клиента;asyncio.timeoutповерх — общий time-budget на вызов (R-RES-TO-1).AsyncCircuitBreaker— fast-fail, когда система явно лежит: после N ошибок CB открывается, следующие вызовы падают немедленно без обращения к Сберу.asyncio.Semaphore— ограничивает количество одновременных вызовов к данной системе. Работает в текущейasyncio-таске,contextvars(trace/MDC) не теряются (R-RES-BH-2).@retryсtenacity— повтор только наget_payment_status, потому что это read. МетодregisterпередаётIdempotency-Key, что позволяет дополнить его retry при необходимости (R-RES-RE-1).
Без CB при медленном Сбере: 20 запросов зависают на 30s каждый, новые request'ы тоже виснут — весь event loop занят ожиданием. С CB: на 11-м вызове (после 10 failures) circuit открывается, следующие вызовы получают CircuitBreakerError немедленно и не нагружают ни Сбер, ни сервис.
Internal service-to-service — timeout + CB
R-RES-WHERE-2: вызовы между внутренними микросервисами тоже защищаются, но набор меньше.
# adapters/out/customer/customer_adapter.py
import asyncio
import httpx
from purgatory import AsyncCircuitBreaker, CircuitBreakerError
from core.port.customer_port import CustomerPort
from core.domain.customer import CustomerView
from adapters.out.customer.mapper import to_customer_view
from adapters.out.customer.errors import CustomerPortError
class CustomerAdapter(CustomerPort):
def __init__(self, client: httpx.AsyncClient, breaker: AsyncCircuitBreaker) -> None:
self._client = client
self._breaker = breaker
async def get_by_id(self, customer_id: str) -> CustomerView:
try:
async with self._breaker:
async with asyncio.timeout(5):
resp = await self._client.get(f"/customers/{customer_id}")
resp.raise_for_status()
return to_customer_view(resp.json())
except CircuitBreakerError as exc:
raise CustomerPortError.service_unavailable("customer-service") from exc
Почему набор меньше:
- Внутренние сервисы под нашим контролем — SLA предсказуем, retry-семантика прозрачна.
- Semaphore добавляется, только если вызов идёт из горячего пути или может затопить пул.
- Retry между нашими сервисами требует осторожности: write-методы без
Idempotency-Key— под запретом (R-RES-RE-X1).
Schedulers и outbox-relay — task-queue, не tenacity
R-RES-WHERE-3: для scheduled-задач (cron, outbox-relay, polling-task) tenacity не подходит. Используется паттерн task-queue в БД.
Почему tenacity не подходит:
tenacityживёт в памяти процесса. Рестарт = потеря счётчиков retry и состояния.@retry(stop=stop_after_attempt(100), wait=wait_fixed(60))— это 100 минут блока coroutine'а.- Для отказов дольше 5–10 секунд нужен durable retry, а не in-memory.
Task-queue решает это через таблицу с next_attempt_at:
CREATE TABLE order_confirmation_task (
task_id BIGSERIAL PRIMARY KEY,
order_id UUID NOT NULL,
status TEXT NOT NULL DEFAULT 'PENDING',
retry_count INTEGER NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX ix_oct_due ON order_confirmation_task (status, next_attempt_at)
WHERE status IN ('PENDING', 'IN_PROGRESS');
# scheduler/order_confirmation_scheduler.py
import asyncio
from datetime import datetime, timedelta
from sqlalchemy.ext.asyncio import AsyncSession
from adapters.out.sber.sber_adapter import SberAdapter
from scheduler.task_repository import OrderConfirmationTaskRepository
async def process_pending(session: AsyncSession, adapter: SberAdapter) -> None:
repo = OrderConfirmationTaskRepository(session)
tasks = await repo.find_due_for_retry(limit=50) # FOR UPDATE SKIP LOCKED
for task in tasks:
try:
await adapter.confirm_order(task.order_id)
await repo.mark_completed(task.task_id)
except Exception as exc:
next_at = datetime.utcnow() + timedelta(seconds=_backoff(task.retry_count))
await repo.schedule_retry(task.task_id, str(exc), next_at)
await session.commit()
def _backoff(retry_count: int) -> int:
return min(30 * (2 ** retry_count), 3600)
Подробнее — в Async и polling.
Inbound REST — rate limit на edge
R-RES-WHERE-4: защита нашего FastAPI от перегрузки — это rate limit, и он живёт на API Gateway (Kong, Istio), не в каждом сервисе.
# Kong Plugin — rate-limiting (gateway-уровень)
plugins:
- name: rate-limiting
config:
minute: 1000
hour: 10000
policy: redis
Почему на gateway:
- Единая точка контроля для всех сервисов; правила не дублируются в каждом.
- Защита происходит до того, как запрос дошёл до FastAPI — экономится CPU и connection.
- Per-client лимиты по API-key / IP — gateway умеет это нативно.
Rate-limit в FastAPI с slowapi допустим только в legacy-сценарии, когда gateway недоступен:
# ДОПУСТИМО только без gateway — legacy-инсталляция
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@router.post("/orders")
@limiter.limit("100/minute")
async def create_order(request: Request, body: CreateOrderRequest) -> OrderResponse:
...
Что запрещено
CB и retry вокруг локального кода
R-RES-WHERE-X1: никакого CB / retry / semaphore на репозитории, SQLAlchemy-сессии или in-memory вычислении.
# ПЛОХО — CB на репозитории
class OrderRepository:
@retry(stop=stop_after_attempt(3)) # ← бессмысленно
async def find_by_id(self, order_id: str) -> Order | None:
result = await self._session.get(OrderModel, order_id)
return to_domain(result) if result else None
Что не так:
- Транзиентов нет. PostgreSQL либо доступен, либо нет. Если соединение оборвалось — это реальный сбой; reconnect обеспечивает пул SQLAlchemy, а не
tenacity. - CB-state бессмысленен. «Открытый» CB на репозитории означает, что сервис не работает — лучше пусть упадёт и pod рестартует.
- Метрики мусорят.
circuit_breaker_state{name="order_repo"}ничего не говорит SRE — она всегда либоclosed, либо «всё плохо».
# ПРАВИЛЬНО — репозиторий без resilience-обёрток
class OrderRepository:
async def find_by_id(self, order_id: str) -> Order | None:
result = await self._session.get(OrderModel, order_id)
return to_domain(result) if result else None
SQLAlchemy + asyncpg сами управляют reconnect и пулом соединений. На уровне репозитория защищать нечего.
Что запрещено — таблица
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| CB / retry на репозитории / SQLAlchemy-сессии | R-RES-WHERE-X1 | Убрать — нет транзиентов |
@retry на async-функции сервисного слоя без outbound | R-RES-WHERE-X1 | Не нужно |
In-memory tenacity для долгих отказов (>30s) | R-RES-WHERE-3 | Task-queue с БД |
| Rate-limit в FastAPI-роутере вместо API Gateway | R-RES-WHERE-4 | Centralized rate-limit на edge |
| Outbound без CB и Semaphore | R-RES-WHERE-1 | Полный набор обязателен |
Один shared AsyncClient на несколько внешних систем | R-RES-ISO-X1 | Отдельный клиент per-system |
AsyncClient() без явных limits и timeout | R-RES-ISO-X2 | httpx.Limits + httpx.Timeout |
Куда дальше
- Per-system isolation — отдельный
AsyncClient, CB и семафор на каждую систему. - Timeouts — иерархия
connect < read < total,httpx.Timeout+asyncio.timeout. - Circuit Breaker — настройка скользящего окна и порогов для
purgatory/aiobreaker. - Retry — когда применять
tenacityи когда нельзя. - Bulkhead —
asyncio.Semaphoreper-system, sizing относительно пула. - Fallback — деградация без «тихого успеха».
- Async и polling — task-queue для долгих отказов вместо
asyncio.sleep-цикла. - Health checks —
cachetools.TTLCacheи лёгкий probe для readiness. - Observability — метрики CB через
prometheus-client, OTel-спаны на адаптере. - Configuration —
pydantic-settingsper-system, дефолты и переопределения. - OpenAPI generator binding — где вешать обёртки при генерации клиента из спеки.