Опирается на правила: R-RES-CB-1R-RES-CB-6 и R-RES-CB-X1R-RES-CB-X3 из Resilience Style Guide → раздел 4. Circuit Breaker.

Важно знать

  • CB — на public-методе out-adapter (SberAdapter.register). Не на сгенерированном httpx-клиенте, не на handler, не на репозитории.
  • Используй purgatory (async-native, context-manager) или aiobreaker. Не пиши CB вручную на try/except + счётчике.
  • Count-based окно: ~50 вызовов, минимум 10 до открытия. Не time-based — неравномерный трафик делает time-based непредсказуемым.
  • Failure rate threshold: 50% — дефолт; 30% — для критичных систем (платежи, фискализация).
  • Open state: 30s ожидания, затем half-open с 3 пробными вызовами. Все успешны → closed; хотя бы один fail → open снова.
  • Slow-call thresholdread_timeout / 2. CB ловит «система медленно умирает» раньше, чем сработает сам timeout.
  • При open CB бросает CircuitBreakerError. Адаптер маппит в port-specific исключение, handler — в 503 / 409.

Circuit Breaker — выключатель: когда внешняя система явно не работает, новые вызовы немедленно завершаются ошибкой без ожидания. Это защищает event-loop от заполнения зависшими тасками и даёт внешней системе время восстановиться без дополнительной нагрузки. Раскрытие раздела 4 гайда.

CB на public-методе out-adapter

R-RES-CB-1: обёртка CB — на public async-методе класса адаптера, который реализует port из core/. Не на сгенерированном клиенте, не на хелпере, не на handler.

# adapters/out/sber/sber_adapter.py
from purgatory import AsyncCircuitBreaker, CircuitBreakerError
import asyncio
import httpx

class SberAdapter:
    """Implements PaymentPort из core/ports/payment_port.py"""

    def __init__(
        self,
        client: httpx.AsyncClient,
        breaker: AsyncCircuitBreaker,
        sem: asyncio.Semaphore,
        settings: SberClientSettings,
        mapper: SberMapper,
    ) -> None:
        self._client = client
        self._breaker = breaker
        self._sem = sem
        self._settings = settings
        self._mapper = mapper

    async def register(self, order: Order) -> PaymentRef:
        async with self._sem:                           # bulkhead (R-RES-BH-1)
            try:
                async with self._breaker:               # CB (R-RES-CB-1)
                    async with asyncio.timeout(self._settings.total_timeout):
                        resp = await self._client.post(
                            "/register",
                            json=self._mapper.to_request(order),
                        )
                        resp.raise_for_status()
                        return self._mapper.to_domain(resp.json())
            except CircuitBreakerError as exc:
                raise PaymentPortError.system_unavailable("sber") from exc

Почему именно так:

  • Не на httpx.AsyncClient — это сгенерированный транспорт; при регенерации из OpenAPI спеки обёртка исчезнет.
  • Не в общем хелпере _execute_call(system_name) со строкой имени — теряется проверка имени CB в конфиге на старте.
  • Не на handler (ConfirmOrderHandler) — handler оркеструет бизнес-логику, адаптер — граница защиты.
  • Не на репозитории — локальный SQL не имеет транзиентного режима (см. Где какая защита).

Конфигурация через pydantic-settings

R-RES-CFG-1 / R-RES-CFG-2: параметры CB — в декларативном конфиге, не хардкодом в коде.

# adapters/out/sber/sber_settings.py
from pydantic_settings import BaseSettings
from pydantic import Field

class SberClientSettings(BaseSettings):
    connect_timeout: float = Field(2.0)
    read_timeout: float = Field(30.0)
    total_timeout: float = Field(35.0)

    cb_window_size: int = Field(50)
    cb_min_calls: int = Field(10)
    cb_failure_rate: float = Field(0.30)    # 30% — критичная система
    cb_open_duration: float = Field(30.0)   # секунды
    cb_half_open_calls: int = Field(3)
    cb_slow_threshold: float = Field(15.0)  # = read_timeout / 2

    max_connections: int = Field(10)
    max_keepalive: int = Field(12)          # = max_connections * 1.2 (R-RES-ISO-2)

    model_config = {"env_prefix": "CLIENT_SBER__"}

Значения из environment CLIENT_SBER__CB_FAILURE_RATE=0.5 переопределяют дефолт — без пересборки.

Count-based окно, а не time-based

R-RES-CB-2: окно по количеству вызовов, не по времени.

# infrastructure/dependencies.py
from purgatory import AsyncCircuitBreaker

def build_sber_breaker(settings: SberClientSettings) -> AsyncCircuitBreaker:
    return AsyncCircuitBreaker(
        failure_threshold=settings.cb_failure_rate,
        recovery_timeout=settings.cb_open_duration,
        name="sber",
    )

Почему count-based, а не time-based:

  • Неравномерный трафик. Outbound к внешней системе идёт пачками после write-операций, между ними — тишина. Time-based окно (например, 30s) в паузу показывает пустой срез — CB никогда не откроется, хотя последние 15 вызовов были с ошибками.
  • Предсказуемость. «50% из 50 последних» — понятный критерий: 25 failures из 50 → open. «50% за 30 секунд» зависит от того, был ли 1 вызов или 100 за этот период.

Failure rate 30% для критичных систем

R-RES-CB-3: дефолт 50% — для систем справочного характера. Для платежей и фискализации — 30%.

Рассуждение про 30% vs 50% для SberAdapter:

  • При 50% threshold CB откроется после 25 failures из 50. Если каждый вызов ждёт 30s read-timeout — это 12.5 минут зависших вызовов до fast-fail режима.
  • При 30% — после 15 failures из 50. Это 7.5 минут — на 40% быстрее.
  • Trade-off: низкий threshold увеличивает риск ложных срабатываний на transient 5xx. Для money лучше быстрый fast-fail с повторной попыткой через task-queue, чем накапливать зависшие таски.

Half-open и пробные вызовы

R-RES-CB-4: после cb_open_duration: 30s CB переходит в half-open. Пропускает cb_half_open_calls: 3 пробных вызова:

  • Все 3 успешны → closed.
  • Хотя бы 1 fail → open, ещё 30s.

Почему 3, не 1 и не 10:

  • 1 — хрупко: один transient 5xx во время восстановления возвращает в open на ещё 30s. Реально восстанавливающаяся система может бесконечно зацикливаться.
  • 10 — избыточно: 10 одновременных calls создают нагрузочный всплеск на систему, которая только начала работать.
  • 3 — репрезентативно и не перегружает.

Slow-call threshold

R-RES-CB-5: CB открывается не только на ошибках, но и на «медленных» успешных вызовах.

При purgatory slow-call threshold настраивается через кастомный predicate или middleware. Для aiobreaker — через timeout_duration. Пример с явным измерением:

import time

async def register(self, order: Order) -> PaymentRef:
    async with self._sem:
        try:
            async with self._breaker:
                start = time.monotonic()
                async with asyncio.timeout(self._settings.total_timeout):
                    resp = await self._client.post(
                        "/register",
                        json=self._mapper.to_request(order),
                    )
                    elapsed = time.monotonic() - start
                    if elapsed > self._settings.cb_slow_threshold:
                        raise SlowCallError(f"sber: {elapsed:.1f}s > threshold {self._settings.cb_slow_threshold}s")
                    resp.raise_for_status()
                    return self._mapper.to_domain(resp.json())
        except CircuitBreakerError as exc:
            raise PaymentPortError.system_unavailable("sber") from exc

cb_slow_threshold = read_timeout / 2: половина от полного timeout — явный сигнал деградации, не разовый всплеск. Библиотека purgatory поддерживает настройку slow_call_duration нативно в следующих версиях; пока — явный raise в адаптере.

Маппинг CircuitBreakerError в port-specific исключение

R-RES-CB-6: когда CB открыт, purgatory бросает CircuitBreakerError. Адаптер маппит его в port-specific исключение, handler — в HTTP-статус.

# core/ports/payment_port.py
class PaymentPortError(Exception):
    @classmethod
    def system_unavailable(cls, system: str) -> "PaymentPortError":
        return cls(f"Payment system '{system}' is unavailable")
# adapters/out/sber/sber_adapter.py
except CircuitBreakerError as exc:
    raise PaymentPortError.system_unavailable("sber") from exc
# bootstrap/exception_handlers.py (FastAPI)
from fastapi import Request
from fastapi.responses import JSONResponse

async def payment_port_error_handler(request: Request, exc: PaymentPortError) -> JSONResponse:
    return JSONResponse(
        status_code=503,
        content={"error": "system_unavailable", "detail": "Платёжная система временно недоступна, попробуйте позже"},
    )

app.add_exception_handler(PaymentPortError, payment_port_error_handler)

ProductPort (каталог) при open CB → 503. OrderPort при создании заказа → 409 Conflict, если бизнес-логика допускает «создать, но не оплатить» — это решение handler, не адаптера. Адаптер всегда бросает port-specific исключение, handler решает HTTP-статус.

Инициализация через DI-контейнер

Зависимости внедряются один раз при старте; каждый адаптер получает свой экземпляр CB и семафора:

# bootstrap/container.py
from purgatory import AsyncCircuitBreaker
import httpx, asyncio

async def build_sber_adapter(settings: SberClientSettings) -> SberAdapter:
    client = httpx.AsyncClient(
        base_url=settings.base_url,
        timeout=httpx.Timeout(
            connect=settings.connect_timeout,
            read=settings.read_timeout,
            write=settings.read_timeout,
            pool=settings.connect_timeout,
        ),
        limits=httpx.Limits(
            max_connections=settings.max_connections,
            max_keepalive_connections=settings.max_keepalive,
        ),
    )
    breaker = AsyncCircuitBreaker(
        failure_threshold=settings.cb_failure_rate,
        recovery_timeout=settings.cb_open_duration,
        name="sber",                                    # R-RES-ISO-3, R-RES-CB-X3
    )
    sem = asyncio.Semaphore(int(settings.max_connections * 0.8))  # R-RES-BH-3
    return SberAdapter(client=client, breaker=breaker, sem=sem, settings=settings, mapper=SberMapper())

Для CustomerAdapter (сервис клиентов) — отдельный AsyncClient, AsyncCircuitBreaker(name="customer"), отдельный Semaphore. Изоляция по R-RES-ISO-1: зависание Sber не трогает pool клиентского сервиса.

Что запрещено

АнтипаттернПравилоЧто взамен
CB на репозитории / SQLAlchemy-вызовеR-RES-CB-X1Только на public-методе out-adapter
Самописный CB на try/except + счётчикR-RES-CB-X2purgatory / aiobreaker из коробки
Один AsyncCircuitBreaker(name="default") на все системыR-RES-CB-X3Per-system name (sber, receipt)
CB на сгенерированном httpx-клиентеR-RES-OAS-X1На public-методе адаптера-обёртки
Time-based окно (aiobreaker с time_unit) вместо count-basedR-RES-CB-2Count-based окно
Без slow-call thresholdR-RES-CB-5Явная проверка elapsed > read_timeout / 2
CircuitBreakerError пропускается наружу как естьR-RES-CB-6Маппинг в PaymentPortError.system_unavailable

Куда дальше

  • Per-system isolation — отдельный AsyncClient + CB + Semaphore на каждую систему.
  • Timeouts — иерархия connect < read < total в httpx.Timeout + asyncio.timeout.
  • Bulkhead — asyncio.Semaphore как замена semaphore-based bulkhead из Java.
  • Retry — tenacity с wait_exponential, только при идемпотентности.
  • Fallback — что делать при open CB без Money(0) и без «тихого успеха».
  • Конфигурация — pydantic-settings + per-system override без хардкода.
  • Observability — prometheus-client для метрик CB, OTel-спаны, state-transition лог.
  • Async и polling — почему asyncio.sleep-цикл в handler — антипаттерн.
  • Health checks — TTL-кеш probe на каждую систему в /health/ready.
  • OpenAPI generator binding — openapi-python-client поверх httpx, CB на адаптере, не на сгенерированном коде.
  • Где какая защита — полная карта: outbound / internal / scheduler / inbound.