Опирается на правила:
R-RES-OBS-1…R-RES-OBS-3иR-RES-OBS-X1из Resilience Style Guide → раздел 12. Observability.
Важно знать
- prometheus-client — основной инструмент экспорта метрик CB/retry/bulkhead; подключается вручную, автоматики как в R4J нет.
- Метрики CB:
circuit_breaker_state(gauge 0/1 по каждому state),circuit_breaker_calls_total(counters поoutcome),circuit_breaker_failure_rate(gauge).- Метрики bulkhead:
semaphore_available(gauge),semaphore_rejected_total(counter).- OTel-spans на каждом adapter-методе с атрибутами
circuit_breaker.stateиexternal.system— даёт связку «slow trace → CB был half_open».- Structured logging: structlog или logging с JSON-форматтером; WARN только на state-transition CB, не на каждый вызов.
- Алёрты: CB в
open> 5 минут;semaphore_available < 2стабильно;retry_failed_totalrate растёт.- Отключить метрики без причины (
R-RES-OBS-X1) — значит SRE узнает, что CB Sber завис в half_open, только от клиентов.
Защита от отказов не работает вслепую. В Python-стеке нет автоматической R4J-интеграции с Micrometer — метрики, спаны и логи нужно подключить явно. Это раскрытие раздела 12 гайда в идиомах Python/FastAPI.
Метрики через prometheus-client
R-RES-OBS-1: для каждого adapter-инстанса регистрируются gauges/counters через prometheus-client.
# adapters/out/sber/sber_metrics.py
from prometheus_client import Counter, Gauge
class SberResilienceMetrics:
def __init__(self, system: str = "sber") -> None:
labels = {"system": system}
self.cb_state = Gauge(
"circuit_breaker_state",
"Current state of circuit breaker (1 = active)",
["system", "state"],
)
for state in ("closed", "open", "half_open"):
self.cb_state.labels(system=system, state=state).set(0)
self.cb_calls = Counter(
"circuit_breaker_calls_total",
"Total adapter calls by outcome",
["system", "outcome"],
)
self.cb_failure_rate = Gauge(
"circuit_breaker_failure_rate",
"Current failure rate of circuit breaker",
["system"],
)
self.sem_available = Gauge(
"semaphore_available",
"Available semaphore slots",
["system"],
)
self.sem_rejected = Counter(
"semaphore_rejected_total",
"Calls rejected by semaphore bulkhead",
["system"],
)
self.retry_total = Counter(
"retry_calls_total",
"Total retry outcomes",
["system", "kind"],
)
self._system = system
def set_cb_state(self, state: str) -> None:
for s in ("closed", "open", "half_open"):
self.cb_state.labels(system=self._system, state=s).set(1 if s == state else 0)
def record_call(self, outcome: str) -> None:
self.cb_calls.labels(system=self._system, outcome=outcome).inc()
def update_failure_rate(self, rate: float) -> None:
self.cb_failure_rate.labels(system=self._system).set(rate)
def update_semaphore(self, available: int) -> None:
self.sem_available.labels(system=self._system).set(available)
def reject_semaphore(self) -> None:
self.sem_rejected.labels(system=self._system).inc()
Что строится в Grafana:
- Per-system панель: текущий state CB через
circuit_breaker_state{state="open"}, calls/s по outcome. - Bulkhead utilization:
1 - semaphore_available / semaphore_max— насколько забит лимит одновременных вызовов. - Retry funnel:
retry_calls_total{kind="succeeded"}/{kind="failed_with_retry"}/{kind="exhausted"}.
OTel-spans с атрибутами
R-RES-OBS-2: span на adapter-методе с circuit_breaker.state и external.system.
# adapters/out/sber/sber_adapter.py
import asyncio
import structlog
from opentelemetry import trace
from opentelemetry.trace import StatusCode
from core.port.payment_port import PaymentPort, PaymentPortError
from core.domain.order import Order, PaymentRef
from .sber_metrics import SberResilienceMetrics
log = structlog.get_logger(__name__)
tracer = trace.get_tracer(__name__)
class SberAdapter(PaymentPort):
def __init__(
self,
client: httpx.AsyncClient,
breaker: CircuitBreaker,
sem: asyncio.Semaphore,
metrics: SberResilienceMetrics,
settings: SberClientSettings,
) -> None:
self._client = client
self._breaker = breaker
self._sem = sem
self._metrics = metrics
self._settings = settings
async def register(self, order: Order) -> PaymentRef:
cb_state = self._breaker.current_state # purgatory/aiobreaker API
with tracer.start_as_current_span("sber.register") as span:
span.set_attribute("external.system", "sber")
span.set_attribute("circuit_breaker.state", cb_state)
try:
result = await self._call_with_protection(order)
self._metrics.record_call("successful")
return result
except CircuitBreakerError as e:
span.set_status(StatusCode.ERROR, "circuit breaker open")
self._metrics.record_call("not_permitted")
raise PaymentPortError.system_unavailable("sber") from e
except Exception as e:
span.record_exception(e)
span.set_status(StatusCode.ERROR)
self._metrics.record_call("failed")
raise
async def _call_with_protection(self, order: Order) -> PaymentRef:
if self._sem._value == 0:
self._metrics.reject_semaphore()
raise PaymentPortError.bulkhead_full("sber")
async with self._sem:
self._metrics.update_semaphore(self._sem._value)
async with self._breaker:
async with asyncio.timeout(self._settings.total_timeout):
resp = await self._client.post(
"/register",
json=_to_sber_request(order),
)
resp.raise_for_status()
return _to_payment_ref(resp.json())
Что даёт связь с трейсингом:
- В Jaeger / Tempo видно «slow trace 4.8s → span
sber.registerсcircuit_breaker.state=half_open» — сразу ясно, что CB был на грани открытия. - При расследовании инцидента по трейсу сразу видно состояние CB в момент каждого запроса.
Structured logging на state-transition
R-RES-OBS-3: WARN только при переходах CB-state, не при каждом вызове.
# adapters/out/sber/sber_cb_listener.py
import structlog
log = structlog.get_logger(__name__)
def make_cb_listener(metrics: SberResilienceMetrics, system: str):
"""
Возвращает callback для подключения к purgatory/aiobreaker events.
"""
def on_state_change(prev_state: str, new_state: str, failure_rate: float) -> None:
log.warning(
"circuit_breaker_state_changed",
system=system,
prev_state=prev_state,
new_state=new_state,
failure_rate=failure_rate,
)
metrics.set_cb_state(new_state)
metrics.update_failure_rate(failure_rate)
return on_state_change
При bulkhead-отказе — аналогично, отдельный WARNING:
# в SberAdapter._call_with_protection
async def _call_with_protection(self, order: Order) -> PaymentRef:
if self._sem._value == 0:
self._metrics.reject_semaphore()
log.warning("bulkhead_rejected", system="sber", sem_value=0)
raise PaymentPortError.bulkhead_full("sber")
...
Что важно:
- Только transitions (
closed → open,open → half_open,half_open → closed). При каждом успешном call — тихо. - WARN-уровень. ERROR — слишком крикливо для ожидаемого поведения CB; INFO — слишком тихо для SRE.
- JSON-поля структурированы (
system,prev_state,new_state,failure_rate) — Loki/OpenSearch разбирают без regex.
Регистрация метрик в FastAPI
# main.py / lifespan
from prometheus_client import make_asgi_app
from fastapi import FastAPI
app = FastAPI()
# /metrics для Prometheus-scrape
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
# infrastructure/di/sber.py
from cachetools import TTLCache
import asyncio, httpx
from adapters.out.sber.sber_adapter import SberAdapter
from adapters.out.sber.sber_metrics import SberResilienceMetrics
from adapters.out.sber.sber_cb_listener import make_cb_listener
from config.settings import SberClientSettings
def build_sber_adapter(settings: SberClientSettings) -> SberAdapter:
metrics = SberResilienceMetrics(system="sber")
client = httpx.AsyncClient(
base_url=settings.base_url,
timeout=httpx.Timeout(
connect=settings.connect_timeout,
read=settings.read_timeout,
write=settings.read_timeout,
pool=1.0,
),
limits=httpx.Limits(
max_connections=settings.max_connections,
max_keepalive_connections=settings.max_keepalive,
),
)
breaker = CircuitBreaker(
name="sber",
failure_threshold=settings.cb_failure_threshold,
recovery_timeout=settings.cb_open_duration,
on_state_change=make_cb_listener(metrics, "sber"),
)
sem = asyncio.Semaphore(settings.max_concurrent)
return SberAdapter(client=client, breaker=breaker, sem=sem, metrics=metrics, settings=settings)
Алёрты
Минимальный набор правил Prometheus для резилиенс-метрик:
# alerting/resilience.yaml
groups:
- name: resilience
rules:
- alert: CircuitBreakerOpen
expr: circuit_breaker_state{state="open"} == 1
for: 5m
labels:
severity: warning
annotations:
summary: "CB {{ $labels.system }} открыт > 5 минут"
- alert: BulkheadNearExhaustion
expr: semaphore_available < 2
for: 2m
labels:
severity: warning
annotations:
summary: "Bulkhead {{ $labels.system }} почти исчерпан"
- alert: RetryExhaustedRising
expr: rate(retry_calls_total{kind="exhausted"}[5m]) > 0.1
for: 3m
labels:
severity: warning
annotations:
summary: "Retry {{ $labels.system }} исчерпывается"
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
prometheus-client не подключён / endpoint /metrics закрыт | R-RES-OBS-X1 | Всегда монтировать make_asgi_app() на /metrics |
Нет gauge circuit_breaker_state — только счётчики вызовов | R-RES-OBS-1 | State-gauge обязателен: SRE видит текущее состояние, не только историю |
Span на adapter-методе без атрибутов external.system и circuit_breaker.state | R-RES-OBS-2 | Оба атрибута; без них slow trace не связывается с CB-состоянием |
| Лог на каждый успешный вызов (INFO "sber.register ok") | R-RES-OBS-3 | Только state-transition; при нагрузке логи вызовов — шум, transition — сигнал |
| WARNING на каждый bulkhead-reject в цикле без rate-limit | R-RES-OBS-3 | Инкрементировать counter, логировать один раз при первом reject |
| Метрики только локально, без алёртов | R-RES-OBS-1 | Минимум три алёрта: CB open, bulkhead exhausted, retry exhausted |
Куда дальше
- Async и polling — task-queue вместо
asyncio.sleep-цикла при опросе внешних систем - Bulkhead —
asyncio.Semaphoreper-system и sizing - Circuit Breaker — state transitions и их семантика
- Конфигурация —
pydantic-settingsper-system параметры - Fallback — явная обработка
CircuitBreakerErrorиRetryError - Health checks — TTL-кеш probe и FastAPI readiness endpoint
- OpenAPI generator binding — обёртки на adapter, не на сгенерированном клиенте
- Per-system isolation — отдельный
AsyncClient+ CB + semaphore на систему - Retry — tenacity, exponential backoff, только при идемпотентности
- Timeouts —
httpx.Timeout+asyncio.timeout, иерархия connect < read < total - Where protection goes — outbound HTTP vs internal vs scheduler