Опирается на правила: R-RES-OBS-1R-RES-OBS-3 и R-RES-OBS-X1 из Resilience Style Guide → раздел 12. Observability.

Важно знать

  • prometheus-client — основной инструмент экспорта метрик CB/retry/bulkhead; подключается вручную, автоматики как в R4J нет.
  • Метрики CB: circuit_breaker_state (gauge 0/1 по каждому state), circuit_breaker_calls_total (counters по outcome), circuit_breaker_failure_rate (gauge).
  • Метрики bulkhead: semaphore_available (gauge), semaphore_rejected_total (counter).
  • OTel-spans на каждом adapter-методе с атрибутами circuit_breaker.state и external.system — даёт связку «slow trace → CB был half_open».
  • Structured logging: structlog или logging с JSON-форматтером; WARN только на state-transition CB, не на каждый вызов.
  • Алёрты: CB в open > 5 минут; semaphore_available < 2 стабильно; retry_failed_total rate растёт.
  • Отключить метрики без причины (R-RES-OBS-X1) — значит SRE узнает, что CB Sber завис в half_open, только от клиентов.

Защита от отказов не работает вслепую. В Python-стеке нет автоматической R4J-интеграции с Micrometer — метрики, спаны и логи нужно подключить явно. Это раскрытие раздела 12 гайда в идиомах Python/FastAPI.

Метрики через prometheus-client

R-RES-OBS-1: для каждого adapter-инстанса регистрируются gauges/counters через prometheus-client.

# adapters/out/sber/sber_metrics.py
from prometheus_client import Counter, Gauge

class SberResilienceMetrics:
    def __init__(self, system: str = "sber") -> None:
        labels = {"system": system}

        self.cb_state = Gauge(
            "circuit_breaker_state",
            "Current state of circuit breaker (1 = active)",
            ["system", "state"],
        )
        for state in ("closed", "open", "half_open"):
            self.cb_state.labels(system=system, state=state).set(0)

        self.cb_calls = Counter(
            "circuit_breaker_calls_total",
            "Total adapter calls by outcome",
            ["system", "outcome"],
        )
        self.cb_failure_rate = Gauge(
            "circuit_breaker_failure_rate",
            "Current failure rate of circuit breaker",
            ["system"],
        )

        self.sem_available = Gauge(
            "semaphore_available",
            "Available semaphore slots",
            ["system"],
        )
        self.sem_rejected = Counter(
            "semaphore_rejected_total",
            "Calls rejected by semaphore bulkhead",
            ["system"],
        )

        self.retry_total = Counter(
            "retry_calls_total",
            "Total retry outcomes",
            ["system", "kind"],
        )
        self._system = system

    def set_cb_state(self, state: str) -> None:
        for s in ("closed", "open", "half_open"):
            self.cb_state.labels(system=self._system, state=s).set(1 if s == state else 0)

    def record_call(self, outcome: str) -> None:
        self.cb_calls.labels(system=self._system, outcome=outcome).inc()

    def update_failure_rate(self, rate: float) -> None:
        self.cb_failure_rate.labels(system=self._system).set(rate)

    def update_semaphore(self, available: int) -> None:
        self.sem_available.labels(system=self._system).set(available)

    def reject_semaphore(self) -> None:
        self.sem_rejected.labels(system=self._system).inc()

Что строится в Grafana:

  • Per-system панель: текущий state CB через circuit_breaker_state{state="open"}, calls/s по outcome.
  • Bulkhead utilization: 1 - semaphore_available / semaphore_max — насколько забит лимит одновременных вызовов.
  • Retry funnel: retry_calls_total{kind="succeeded"} / {kind="failed_with_retry"} / {kind="exhausted"}.

OTel-spans с атрибутами

R-RES-OBS-2: span на adapter-методе с circuit_breaker.state и external.system.

# adapters/out/sber/sber_adapter.py
import asyncio
import structlog
from opentelemetry import trace
from opentelemetry.trace import StatusCode

from core.port.payment_port import PaymentPort, PaymentPortError
from core.domain.order import Order, PaymentRef
from .sber_metrics import SberResilienceMetrics

log = structlog.get_logger(__name__)
tracer = trace.get_tracer(__name__)


class SberAdapter(PaymentPort):
    def __init__(
        self,
        client: httpx.AsyncClient,
        breaker: CircuitBreaker,
        sem: asyncio.Semaphore,
        metrics: SberResilienceMetrics,
        settings: SberClientSettings,
    ) -> None:
        self._client = client
        self._breaker = breaker
        self._sem = sem
        self._metrics = metrics
        self._settings = settings

    async def register(self, order: Order) -> PaymentRef:
        cb_state = self._breaker.current_state  # purgatory/aiobreaker API

        with tracer.start_as_current_span("sber.register") as span:
            span.set_attribute("external.system", "sber")
            span.set_attribute("circuit_breaker.state", cb_state)

            try:
                result = await self._call_with_protection(order)
                self._metrics.record_call("successful")
                return result
            except CircuitBreakerError as e:
                span.set_status(StatusCode.ERROR, "circuit breaker open")
                self._metrics.record_call("not_permitted")
                raise PaymentPortError.system_unavailable("sber") from e
            except Exception as e:
                span.record_exception(e)
                span.set_status(StatusCode.ERROR)
                self._metrics.record_call("failed")
                raise

    async def _call_with_protection(self, order: Order) -> PaymentRef:
        if self._sem._value == 0:
            self._metrics.reject_semaphore()
            raise PaymentPortError.bulkhead_full("sber")

        async with self._sem:
            self._metrics.update_semaphore(self._sem._value)
            async with self._breaker:
                async with asyncio.timeout(self._settings.total_timeout):
                    resp = await self._client.post(
                        "/register",
                        json=_to_sber_request(order),
                    )
                    resp.raise_for_status()
                    return _to_payment_ref(resp.json())

Что даёт связь с трейсингом:

  • В Jaeger / Tempo видно «slow trace 4.8s → span sber.register с circuit_breaker.state=half_open» — сразу ясно, что CB был на грани открытия.
  • При расследовании инцидента по трейсу сразу видно состояние CB в момент каждого запроса.

Structured logging на state-transition

R-RES-OBS-3: WARN только при переходах CB-state, не при каждом вызове.

# adapters/out/sber/sber_cb_listener.py
import structlog

log = structlog.get_logger(__name__)


def make_cb_listener(metrics: SberResilienceMetrics, system: str):
    """
    Возвращает callback для подключения к purgatory/aiobreaker events.
    """
    def on_state_change(prev_state: str, new_state: str, failure_rate: float) -> None:
        log.warning(
            "circuit_breaker_state_changed",
            system=system,
            prev_state=prev_state,
            new_state=new_state,
            failure_rate=failure_rate,
        )
        metrics.set_cb_state(new_state)
        metrics.update_failure_rate(failure_rate)

    return on_state_change

При bulkhead-отказе — аналогично, отдельный WARNING:

# в SberAdapter._call_with_protection

async def _call_with_protection(self, order: Order) -> PaymentRef:
    if self._sem._value == 0:
        self._metrics.reject_semaphore()
        log.warning("bulkhead_rejected", system="sber", sem_value=0)
        raise PaymentPortError.bulkhead_full("sber")
    ...

Что важно:

  • Только transitions (closed → open, open → half_open, half_open → closed). При каждом успешном call — тихо.
  • WARN-уровень. ERROR — слишком крикливо для ожидаемого поведения CB; INFO — слишком тихо для SRE.
  • JSON-поля структурированы (system, prev_state, new_state, failure_rate) — Loki/OpenSearch разбирают без regex.

Регистрация метрик в FastAPI

# main.py / lifespan
from prometheus_client import make_asgi_app
from fastapi import FastAPI

app = FastAPI()

# /metrics для Prometheus-scrape
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
# infrastructure/di/sber.py
from cachetools import TTLCache
import asyncio, httpx
from adapters.out.sber.sber_adapter import SberAdapter
from adapters.out.sber.sber_metrics import SberResilienceMetrics
from adapters.out.sber.sber_cb_listener import make_cb_listener
from config.settings import SberClientSettings

def build_sber_adapter(settings: SberClientSettings) -> SberAdapter:
    metrics = SberResilienceMetrics(system="sber")

    client = httpx.AsyncClient(
        base_url=settings.base_url,
        timeout=httpx.Timeout(
            connect=settings.connect_timeout,
            read=settings.read_timeout,
            write=settings.read_timeout,
            pool=1.0,
        ),
        limits=httpx.Limits(
            max_connections=settings.max_connections,
            max_keepalive_connections=settings.max_keepalive,
        ),
    )

    breaker = CircuitBreaker(
        name="sber",
        failure_threshold=settings.cb_failure_threshold,
        recovery_timeout=settings.cb_open_duration,
        on_state_change=make_cb_listener(metrics, "sber"),
    )

    sem = asyncio.Semaphore(settings.max_concurrent)

    return SberAdapter(client=client, breaker=breaker, sem=sem, metrics=metrics, settings=settings)

Алёрты

Минимальный набор правил Prometheus для резилиенс-метрик:

# alerting/resilience.yaml
groups:
  - name: resilience
    rules:
      - alert: CircuitBreakerOpen
        expr: circuit_breaker_state{state="open"} == 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "CB {{ $labels.system }} открыт > 5 минут"

      - alert: BulkheadNearExhaustion
        expr: semaphore_available < 2
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "Bulkhead {{ $labels.system }} почти исчерпан"

      - alert: RetryExhaustedRising
        expr: rate(retry_calls_total{kind="exhausted"}[5m]) > 0.1
        for: 3m
        labels:
          severity: warning
        annotations:
          summary: "Retry {{ $labels.system }} исчерпывается"

Что запрещено

АнтипаттернПравилоЧто взамен
prometheus-client не подключён / endpoint /metrics закрытR-RES-OBS-X1Всегда монтировать make_asgi_app() на /metrics
Нет gauge circuit_breaker_state — только счётчики вызововR-RES-OBS-1State-gauge обязателен: SRE видит текущее состояние, не только историю
Span на adapter-методе без атрибутов external.system и circuit_breaker.stateR-RES-OBS-2Оба атрибута; без них slow trace не связывается с CB-состоянием
Лог на каждый успешный вызов (INFO "sber.register ok")R-RES-OBS-3Только state-transition; при нагрузке логи вызовов — шум, transition — сигнал
WARNING на каждый bulkhead-reject в цикле без rate-limitR-RES-OBS-3Инкрементировать counter, логировать один раз при первом reject
Метрики только локально, без алёртовR-RES-OBS-1Минимум три алёрта: CB open, bulkhead exhausted, retry exhausted

Куда дальше

  • Async и polling — task-queue вместо asyncio.sleep-цикла при опросе внешних систем
  • Bulkhead — asyncio.Semaphore per-system и sizing
  • Circuit Breaker — state transitions и их семантика
  • Конфигурация — pydantic-settings per-system параметры
  • Fallback — явная обработка CircuitBreakerError и RetryError
  • Health checks — TTL-кеш probe и FastAPI readiness endpoint
  • OpenAPI generator binding — обёртки на adapter, не на сгенерированном клиенте
  • Per-system isolation — отдельный AsyncClient + CB + semaphore на систему
  • Retry — tenacity, exponential backoff, только при идемпотентности
  • Timeouts — httpx.Timeout + asyncio.timeout, иерархия connect < read < total
  • Where protection goes — outbound HTTP vs internal vs scheduler