Опирается на правила:
R-RES-CB-1…R-RES-CB-6иR-RES-CB-X1…R-RES-CB-X3из Resilience Style Guide → раздел 4. Circuit Breaker.
Важно знать
- CB — на public-методе out-adapter (
SberAdapter.register). Не на сгенерированном httpx-клиенте, не на handler, не на репозитории.- Используй
purgatory(async-native, context-manager) илиaiobreaker. Не пиши CB вручную наtry/except+ счётчике.- Count-based окно: ~50 вызовов, минимум 10 до открытия. Не time-based — неравномерный трафик делает time-based непредсказуемым.
- Failure rate threshold: 50% — дефолт; 30% — для критичных систем (платежи, фискализация).
- Open state: 30s ожидания, затем half-open с 3 пробными вызовами. Все успешны → closed; хотя бы один fail → open снова.
- Slow-call threshold ≈
read_timeout / 2. CB ловит «система медленно умирает» раньше, чем сработает сам timeout.- При open CB бросает
CircuitBreakerError. Адаптер маппит в port-specific исключение, handler — в 503 / 409.
Circuit Breaker — выключатель: когда внешняя система явно не работает, новые вызовы немедленно завершаются ошибкой без ожидания. Это защищает event-loop от заполнения зависшими тасками и даёт внешней системе время восстановиться без дополнительной нагрузки. Раскрытие раздела 4 гайда.
CB на public-методе out-adapter
R-RES-CB-1: обёртка CB — на public async-методе класса адаптера, который реализует port из core/. Не на сгенерированном клиенте, не на хелпере, не на handler.
# adapters/out/sber/sber_adapter.py
from purgatory import AsyncCircuitBreaker, CircuitBreakerError
import asyncio
import httpx
class SberAdapter:
"""Implements PaymentPort из core/ports/payment_port.py"""
def __init__(
self,
client: httpx.AsyncClient,
breaker: AsyncCircuitBreaker,
sem: asyncio.Semaphore,
settings: SberClientSettings,
mapper: SberMapper,
) -> None:
self._client = client
self._breaker = breaker
self._sem = sem
self._settings = settings
self._mapper = mapper
async def register(self, order: Order) -> PaymentRef:
async with self._sem: # bulkhead (R-RES-BH-1)
try:
async with self._breaker: # CB (R-RES-CB-1)
async with asyncio.timeout(self._settings.total_timeout):
resp = await self._client.post(
"/register",
json=self._mapper.to_request(order),
)
resp.raise_for_status()
return self._mapper.to_domain(resp.json())
except CircuitBreakerError as exc:
raise PaymentPortError.system_unavailable("sber") from exc
Почему именно так:
- Не на
httpx.AsyncClient— это сгенерированный транспорт; при регенерации из OpenAPI спеки обёртка исчезнет. - Не в общем хелпере
_execute_call(system_name)со строкой имени — теряется проверка имени CB в конфиге на старте. - Не на handler (
ConfirmOrderHandler) — handler оркеструет бизнес-логику, адаптер — граница защиты. - Не на репозитории — локальный SQL не имеет транзиентного режима (см. Где какая защита).
Конфигурация через pydantic-settings
R-RES-CFG-1 / R-RES-CFG-2: параметры CB — в декларативном конфиге, не хардкодом в коде.
# adapters/out/sber/sber_settings.py
from pydantic_settings import BaseSettings
from pydantic import Field
class SberClientSettings(BaseSettings):
connect_timeout: float = Field(2.0)
read_timeout: float = Field(30.0)
total_timeout: float = Field(35.0)
cb_window_size: int = Field(50)
cb_min_calls: int = Field(10)
cb_failure_rate: float = Field(0.30) # 30% — критичная система
cb_open_duration: float = Field(30.0) # секунды
cb_half_open_calls: int = Field(3)
cb_slow_threshold: float = Field(15.0) # = read_timeout / 2
max_connections: int = Field(10)
max_keepalive: int = Field(12) # = max_connections * 1.2 (R-RES-ISO-2)
model_config = {"env_prefix": "CLIENT_SBER__"}
Значения из environment CLIENT_SBER__CB_FAILURE_RATE=0.5 переопределяют дефолт — без пересборки.
Count-based окно, а не time-based
R-RES-CB-2: окно по количеству вызовов, не по времени.
# infrastructure/dependencies.py
from purgatory import AsyncCircuitBreaker
def build_sber_breaker(settings: SberClientSettings) -> AsyncCircuitBreaker:
return AsyncCircuitBreaker(
failure_threshold=settings.cb_failure_rate,
recovery_timeout=settings.cb_open_duration,
name="sber",
)
Почему count-based, а не time-based:
- Неравномерный трафик. Outbound к внешней системе идёт пачками после write-операций, между ними — тишина. Time-based окно (например, 30s) в паузу показывает пустой срез — CB никогда не откроется, хотя последние 15 вызовов были с ошибками.
- Предсказуемость. «50% из 50 последних» — понятный критерий: 25 failures из 50 → open. «50% за 30 секунд» зависит от того, был ли 1 вызов или 100 за этот период.
Failure rate 30% для критичных систем
R-RES-CB-3: дефолт 50% — для систем справочного характера. Для платежей и фискализации — 30%.
Рассуждение про 30% vs 50% для SberAdapter:
- При 50% threshold CB откроется после 25 failures из 50. Если каждый вызов ждёт 30s read-timeout — это 12.5 минут зависших вызовов до fast-fail режима.
- При 30% — после 15 failures из 50. Это 7.5 минут — на 40% быстрее.
- Trade-off: низкий threshold увеличивает риск ложных срабатываний на transient 5xx. Для money лучше быстрый fast-fail с повторной попыткой через task-queue, чем накапливать зависшие таски.
Half-open и пробные вызовы
R-RES-CB-4: после cb_open_duration: 30s CB переходит в half-open. Пропускает cb_half_open_calls: 3 пробных вызова:
- Все 3 успешны → closed.
- Хотя бы 1 fail → open, ещё 30s.
Почему 3, не 1 и не 10:
- 1 — хрупко: один transient 5xx во время восстановления возвращает в open на ещё 30s. Реально восстанавливающаяся система может бесконечно зацикливаться.
- 10 — избыточно: 10 одновременных calls создают нагрузочный всплеск на систему, которая только начала работать.
- 3 — репрезентативно и не перегружает.
Slow-call threshold
R-RES-CB-5: CB открывается не только на ошибках, но и на «медленных» успешных вызовах.
При purgatory slow-call threshold настраивается через кастомный predicate или middleware. Для aiobreaker — через timeout_duration. Пример с явным измерением:
import time
async def register(self, order: Order) -> PaymentRef:
async with self._sem:
try:
async with self._breaker:
start = time.monotonic()
async with asyncio.timeout(self._settings.total_timeout):
resp = await self._client.post(
"/register",
json=self._mapper.to_request(order),
)
elapsed = time.monotonic() - start
if elapsed > self._settings.cb_slow_threshold:
raise SlowCallError(f"sber: {elapsed:.1f}s > threshold {self._settings.cb_slow_threshold}s")
resp.raise_for_status()
return self._mapper.to_domain(resp.json())
except CircuitBreakerError as exc:
raise PaymentPortError.system_unavailable("sber") from exc
cb_slow_threshold = read_timeout / 2: половина от полного timeout — явный сигнал деградации, не разовый всплеск. Библиотека purgatory поддерживает настройку slow_call_duration нативно в следующих версиях; пока — явный raise в адаптере.
Маппинг CircuitBreakerError в port-specific исключение
R-RES-CB-6: когда CB открыт, purgatory бросает CircuitBreakerError. Адаптер маппит его в port-specific исключение, handler — в HTTP-статус.
# core/ports/payment_port.py
class PaymentPortError(Exception):
@classmethod
def system_unavailable(cls, system: str) -> "PaymentPortError":
return cls(f"Payment system '{system}' is unavailable")
# adapters/out/sber/sber_adapter.py
except CircuitBreakerError as exc:
raise PaymentPortError.system_unavailable("sber") from exc
# bootstrap/exception_handlers.py (FastAPI)
from fastapi import Request
from fastapi.responses import JSONResponse
async def payment_port_error_handler(request: Request, exc: PaymentPortError) -> JSONResponse:
return JSONResponse(
status_code=503,
content={"error": "system_unavailable", "detail": "Платёжная система временно недоступна, попробуйте позже"},
)
app.add_exception_handler(PaymentPortError, payment_port_error_handler)
ProductPort (каталог) при open CB → 503. OrderPort при создании заказа → 409 Conflict, если бизнес-логика допускает «создать, но не оплатить» — это решение handler, не адаптера. Адаптер всегда бросает port-specific исключение, handler решает HTTP-статус.
Инициализация через DI-контейнер
Зависимости внедряются один раз при старте; каждый адаптер получает свой экземпляр CB и семафора:
# bootstrap/container.py
from purgatory import AsyncCircuitBreaker
import httpx, asyncio
async def build_sber_adapter(settings: SberClientSettings) -> SberAdapter:
client = httpx.AsyncClient(
base_url=settings.base_url,
timeout=httpx.Timeout(
connect=settings.connect_timeout,
read=settings.read_timeout,
write=settings.read_timeout,
pool=settings.connect_timeout,
),
limits=httpx.Limits(
max_connections=settings.max_connections,
max_keepalive_connections=settings.max_keepalive,
),
)
breaker = AsyncCircuitBreaker(
failure_threshold=settings.cb_failure_rate,
recovery_timeout=settings.cb_open_duration,
name="sber", # R-RES-ISO-3, R-RES-CB-X3
)
sem = asyncio.Semaphore(int(settings.max_connections * 0.8)) # R-RES-BH-3
return SberAdapter(client=client, breaker=breaker, sem=sem, settings=settings, mapper=SberMapper())
Для CustomerAdapter (сервис клиентов) — отдельный AsyncClient, AsyncCircuitBreaker(name="customer"), отдельный Semaphore. Изоляция по R-RES-ISO-1: зависание Sber не трогает pool клиентского сервиса.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| CB на репозитории / SQLAlchemy-вызове | R-RES-CB-X1 | Только на public-методе out-adapter |
Самописный CB на try/except + счётчик | R-RES-CB-X2 | purgatory / aiobreaker из коробки |
Один AsyncCircuitBreaker(name="default") на все системы | R-RES-CB-X3 | Per-system name (sber, receipt) |
| CB на сгенерированном httpx-клиенте | R-RES-OAS-X1 | На public-методе адаптера-обёртки |
Time-based окно (aiobreaker с time_unit) вместо count-based | R-RES-CB-2 | Count-based окно |
| Без slow-call threshold | R-RES-CB-5 | Явная проверка elapsed > read_timeout / 2 |
CircuitBreakerError пропускается наружу как есть | R-RES-CB-6 | Маппинг в PaymentPortError.system_unavailable |
Куда дальше
- Per-system isolation — отдельный
AsyncClient+ CB +Semaphoreна каждую систему. - Timeouts — иерархия
connect < read < totalвhttpx.Timeout+asyncio.timeout. - Bulkhead —
asyncio.Semaphoreкак замена semaphore-based bulkhead из Java. - Retry —
tenacityсwait_exponential, только при идемпотентности. - Fallback — что делать при open CB без
Money(0)и без «тихого успеха». - Конфигурация —
pydantic-settings+ per-system override без хардкода. - Observability —
prometheus-clientдля метрик CB, OTel-спаны, state-transition лог. - Async и polling — почему
asyncio.sleep-цикл в handler — антипаттерн. - Health checks — TTL-кеш probe на каждую систему в
/health/ready. - OpenAPI generator binding —
openapi-python-clientповерх httpx, CB на адаптере, не на сгенерированном коде. - Где какая защита — полная карта: outbound / internal / scheduler / inbound.