Опирается на правила: R-RES-WHERE-1R-RES-WHERE-4 и R-RES-WHERE-X1 из Resilience Style Guide → раздел 1. Где какая защита.

Важно знать

  • Outbound HTTP к внешним системам (Сбер, СМС-провайдер, страховщик) — полный набор: httpx.Timeout + asyncio.timeout + circuit breaker + asyncio.Semaphore + опциональный @retry. Без CB первый «slow burn» внешней системы заблокирует все worker-таски.
  • Internal service-to-serviceasyncio.timeout + circuit breaker. Semaphore — по необходимости.
  • Schedulers и outbox-relay — task-queue через таблицу БД (status, retry_count, next_attempt_at), не in-memory tenacity. In-memory retry гибнет при рестарте процесса и не переживает отказы дольше нескольких секунд.
  • Inbound (наш FastAPI) — rate limit на API Gateway (Kong, Istio). В самом FastAPI — только в исключительных случаях, когда gateway недоступен.
  • Локальный код (репозиторий, SQLAlchemy, in-memory) — без CB и retry. Транзиентов нет; любой сбой здесь — реальная ошибка, а не флуктуация среды.
  • CB/retry/semaphore навешиваются на public-метод out-adapter, не на сгенерированный клиент и не на handler.
  • Всегда использовать готовую библиотеку (purgatory/aiobreaker/tenacity), не самописный счётчик на try/except — самописный CB является источником труднодиагностируемых ошибок.

httpx + tenacity + circuit breaker — не «навесить везде». У каждой группы вызовов своя категория защиты, и неверное размещение вредит больше, чем отсутствие защиты.

Outbound HTTP к внешним системам — полный набор

R-RES-WHERE-1: любой вызов к внешней системе защищается полным набором: timeout, circuit breaker, bulkhead и (при идемпотентности) retry.

# adapters/out/sber/sber_adapter.py
import asyncio
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from purgatory import AsyncCircuitBreaker, CircuitBreakerError

from core.port.payment_port import PaymentPort
from core.domain.order import Order
from core.domain.payment_ref import PaymentRef
from adapters.out.sber.mapper import to_sber_request, to_domain
from adapters.out.sber.errors import PaymentPortError


class SberAdapter(PaymentPort):

    def __init__(
        self,
        client: httpx.AsyncClient,
        breaker: AsyncCircuitBreaker,
        sem: asyncio.Semaphore,
    ) -> None:
        self._client = client
        self._breaker = breaker
        self._sem = sem

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=0.5, min=0.5, max=4),
        retry=retry_if_exception_type((httpx.TimeoutException, httpx.ConnectError)),
        reraise=True,
    )
    async def get_payment_status(self, order_id: str) -> PaymentRef:
        async with self._sem:                          # bulkhead  (R-RES-BH-1)
            try:
                async with self._breaker:              # CB        (R-RES-CB-1)
                    async with asyncio.timeout(30):    # time-limiter
                        resp = await self._client.get(f"/status/{order_id}")
                        resp.raise_for_status()
                        return to_domain(resp.json())
            except CircuitBreakerError as exc:
                raise PaymentPortError.system_unavailable("sber") from exc

    async def register(self, order: Order) -> PaymentRef:
        async with self._sem:
            try:
                async with self._breaker:
                    async with asyncio.timeout(30):
                        resp = await self._client.post(
                            "/register",
                            json=to_sber_request(order),
                            headers={"Idempotency-Key": str(order.order_id)},
                        )
                        resp.raise_for_status()
                        return to_domain(resp.json())
            except CircuitBreakerError as exc:
                raise PaymentPortError.system_unavailable("sber") from exc

Что делает каждый слой:

  • httpx.Timeout(connect=2, read=10, write=5, pool=1) — per-request лимиты на уровне HTTP-клиента; asyncio.timeout поверх — общий time-budget на вызов (R-RES-TO-1).
  • AsyncCircuitBreaker — fast-fail, когда система явно лежит: после N ошибок CB открывается, следующие вызовы падают немедленно без обращения к Сберу.
  • asyncio.Semaphore — ограничивает количество одновременных вызовов к данной системе. Работает в текущей asyncio-таске, contextvars (trace/MDC) не теряются (R-RES-BH-2).
  • @retry с tenacity — повтор только на get_payment_status, потому что это read. Метод register передаёт Idempotency-Key, что позволяет дополнить его retry при необходимости (R-RES-RE-1).

Без CB при медленном Сбере: 20 запросов зависают на 30s каждый, новые request'ы тоже виснут — весь event loop занят ожиданием. С CB: на 11-м вызове (после 10 failures) circuit открывается, следующие вызовы получают CircuitBreakerError немедленно и не нагружают ни Сбер, ни сервис.

Internal service-to-service — timeout + CB

R-RES-WHERE-2: вызовы между внутренними микросервисами тоже защищаются, но набор меньше.

# adapters/out/customer/customer_adapter.py
import asyncio
import httpx
from purgatory import AsyncCircuitBreaker, CircuitBreakerError

from core.port.customer_port import CustomerPort
from core.domain.customer import CustomerView
from adapters.out.customer.mapper import to_customer_view
from adapters.out.customer.errors import CustomerPortError


class CustomerAdapter(CustomerPort):

    def __init__(self, client: httpx.AsyncClient, breaker: AsyncCircuitBreaker) -> None:
        self._client = client
        self._breaker = breaker

    async def get_by_id(self, customer_id: str) -> CustomerView:
        try:
            async with self._breaker:
                async with asyncio.timeout(5):
                    resp = await self._client.get(f"/customers/{customer_id}")
                    resp.raise_for_status()
                    return to_customer_view(resp.json())
        except CircuitBreakerError as exc:
            raise CustomerPortError.service_unavailable("customer-service") from exc

Почему набор меньше:

  • Внутренние сервисы под нашим контролем — SLA предсказуем, retry-семантика прозрачна.
  • Semaphore добавляется, только если вызов идёт из горячего пути или может затопить пул.
  • Retry между нашими сервисами требует осторожности: write-методы без Idempotency-Key — под запретом (R-RES-RE-X1).

Schedulers и outbox-relay — task-queue, не tenacity

R-RES-WHERE-3: для scheduled-задач (cron, outbox-relay, polling-task) tenacity не подходит. Используется паттерн task-queue в БД.

Почему tenacity не подходит:

  • tenacity живёт в памяти процесса. Рестарт = потеря счётчиков retry и состояния.
  • @retry(stop=stop_after_attempt(100), wait=wait_fixed(60)) — это 100 минут блока coroutine'а.
  • Для отказов дольше 5–10 секунд нужен durable retry, а не in-memory.

Task-queue решает это через таблицу с next_attempt_at:

CREATE TABLE order_confirmation_task (
    task_id          BIGSERIAL PRIMARY KEY,
    order_id         UUID        NOT NULL,
    status           TEXT        NOT NULL DEFAULT 'PENDING',
    retry_count      INTEGER     NOT NULL DEFAULT 0,
    next_attempt_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    last_error       TEXT,
    created_at       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at       TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX ix_oct_due ON order_confirmation_task (status, next_attempt_at)
    WHERE status IN ('PENDING', 'IN_PROGRESS');
# scheduler/order_confirmation_scheduler.py
import asyncio
from datetime import datetime, timedelta

from sqlalchemy.ext.asyncio import AsyncSession

from adapters.out.sber.sber_adapter import SberAdapter
from scheduler.task_repository import OrderConfirmationTaskRepository


async def process_pending(session: AsyncSession, adapter: SberAdapter) -> None:
    repo = OrderConfirmationTaskRepository(session)
    tasks = await repo.find_due_for_retry(limit=50)   # FOR UPDATE SKIP LOCKED

    for task in tasks:
        try:
            await adapter.confirm_order(task.order_id)
            await repo.mark_completed(task.task_id)
        except Exception as exc:
            next_at = datetime.utcnow() + timedelta(seconds=_backoff(task.retry_count))
            await repo.schedule_retry(task.task_id, str(exc), next_at)

    await session.commit()


def _backoff(retry_count: int) -> int:
    return min(30 * (2 ** retry_count), 3600)

Подробнее — в Async и polling.

Inbound REST — rate limit на edge

R-RES-WHERE-4: защита нашего FastAPI от перегрузки — это rate limit, и он живёт на API Gateway (Kong, Istio), не в каждом сервисе.

# Kong Plugin — rate-limiting (gateway-уровень)
plugins:
  - name: rate-limiting
    config:
      minute: 1000
      hour: 10000
      policy: redis

Почему на gateway:

  • Единая точка контроля для всех сервисов; правила не дублируются в каждом.
  • Защита происходит до того, как запрос дошёл до FastAPI — экономится CPU и connection.
  • Per-client лимиты по API-key / IP — gateway умеет это нативно.

Rate-limit в FastAPI с slowapi допустим только в legacy-сценарии, когда gateway недоступен:

# ДОПУСТИМО только без gateway — legacy-инсталляция
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@router.post("/orders")
@limiter.limit("100/minute")
async def create_order(request: Request, body: CreateOrderRequest) -> OrderResponse:
    ...

Что запрещено

CB и retry вокруг локального кода

R-RES-WHERE-X1: никакого CB / retry / semaphore на репозитории, SQLAlchemy-сессии или in-memory вычислении.

# ПЛОХО — CB на репозитории
class OrderRepository:

    @retry(stop=stop_after_attempt(3))      # ← бессмысленно
    async def find_by_id(self, order_id: str) -> Order | None:
        result = await self._session.get(OrderModel, order_id)
        return to_domain(result) if result else None

Что не так:

  • Транзиентов нет. PostgreSQL либо доступен, либо нет. Если соединение оборвалось — это реальный сбой; reconnect обеспечивает пул SQLAlchemy, а не tenacity.
  • CB-state бессмысленен. «Открытый» CB на репозитории означает, что сервис не работает — лучше пусть упадёт и pod рестартует.
  • Метрики мусорят. circuit_breaker_state{name="order_repo"} ничего не говорит SRE — она всегда либо closed, либо «всё плохо».
# ПРАВИЛЬНО — репозиторий без resilience-обёрток
class OrderRepository:

    async def find_by_id(self, order_id: str) -> Order | None:
        result = await self._session.get(OrderModel, order_id)
        return to_domain(result) if result else None

SQLAlchemy + asyncpg сами управляют reconnect и пулом соединений. На уровне репозитория защищать нечего.

Что запрещено — таблица

АнтипаттернПравилоЧто взамен
CB / retry на репозитории / SQLAlchemy-сессииR-RES-WHERE-X1Убрать — нет транзиентов
@retry на async-функции сервисного слоя без outboundR-RES-WHERE-X1Не нужно
In-memory tenacity для долгих отказов (>30s)R-RES-WHERE-3Task-queue с БД
Rate-limit в FastAPI-роутере вместо API GatewayR-RES-WHERE-4Centralized rate-limit на edge
Outbound без CB и SemaphoreR-RES-WHERE-1Полный набор обязателен
Один shared AsyncClient на несколько внешних системR-RES-ISO-X1Отдельный клиент per-system
AsyncClient() без явных limits и timeoutR-RES-ISO-X2httpx.Limits + httpx.Timeout

Куда дальше

  • Per-system isolation — отдельный AsyncClient, CB и семафор на каждую систему.
  • Timeouts — иерархия connect < read < total, httpx.Timeout + asyncio.timeout.
  • Circuit Breaker — настройка скользящего окна и порогов для purgatory/aiobreaker.
  • Retry — когда применять tenacity и когда нельзя.
  • Bulkhead — asyncio.Semaphore per-system, sizing относительно пула.
  • Fallback — деградация без «тихого успеха».
  • Async и polling — task-queue для долгих отказов вместо asyncio.sleep-цикла.
  • Health checks — cachetools.TTLCache и лёгкий probe для readiness.
  • Observability — метрики CB через prometheus-client, OTel-спаны на адаптере.
  • Configuration — pydantic-settings per-system, дефолты и переопределения.
  • OpenAPI generator binding — где вешать обёртки при генерации клиента из спеки.