Опирается на правила:
R-RES-HC-1…R-RES-HC-4иR-RES-HC-X1…R-RES-HC-X2из Resilience Style Guide → раздел 10. Health checks.
Важно знать
- На каждую внешнюю систему — отдельная async-функция-индикатор:
check_sber_health,check_receipt_health.- Probe cached с TTL ~30s —
cachetools.TTLCacheили ручнойlast_result+last_probe: datetime. Не каждый/health/readyзапрос идёт в Sber.- Probe-метод — light:
GET /healthилиOPTIONS /(если внешняя система не предоставляет health-endpoint). Неregister, неget_order_statusс реальными данными.readinessвключает внешние системы;liveness— не включает. k8s не должен убивать pod из-за того, что Sber лежит.- Probe без кеша = самодельный DDoS: k8s с 5 pod'ами при интервале 10s даёт 30 RPS к внешней системе только от health.
- Business-операция в probe изменяет состояние, плодит тестовые данные, портит метрики.
- Actuator-аналог в FastAPI — роутер
/healthс отдельными эндпоинтами/health/liveи/health/ready.
Health check внешней системы — это способ k8s узнать, что pod не готов принимать трафик, не «глобальный монитор доступности». Поэтому probe должен быть дешёвым (для нас и для внешней системы) и точным (отражать реальную готовность). Раскрытие раздела 10 style guide в идиомах Python/FastAPI.
Индикатор на каждую внешнюю систему
R-RES-HC-1: на каждую систему — отдельный индикатор, не общий all_systems_ok.
# adapters/out/sber/sber_health.py
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
import httpx
TTL = timedelta(seconds=30)
@dataclass
class SystemHealth:
status: str # "up" | "down" | "unknown"
detail: str = ""
@dataclass
class SberHealthIndicator:
client: httpx.AsyncClient
_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)
_last_result: SystemHealth = field(default_factory=lambda: SystemHealth("unknown"), init=False)
_last_probe: datetime = field(default_factory=lambda: datetime.min.replace(tzinfo=timezone.utc), init=False)
async def check(self) -> SystemHealth:
now = datetime.now(tz=timezone.utc)
if now - self._last_probe < TTL:
return self._last_result
async with self._lock:
if now - self._last_probe < TTL: # double-checked под локом
return self._last_result
self._last_probe = now
self._last_result = await self._probe()
return self._last_result
async def _probe(self) -> SystemHealth:
try:
resp = await self.client.get("/health", timeout=5.0)
if resp.status_code < 300:
return SystemHealth("up")
return SystemHealth("down", detail=f"status {resp.status_code}")
except Exception as exc:
return SystemHealth("down", detail=str(exc))
Индикатор — не singleton-функция, а объект с собственным AsyncClient (per-system по R-RES-ISO-1). Lock предотвращает параллельные probe при одновременных запросах к /health/ready.
TTL-кеш — cachetools или ручной
R-RES-HC-2: результат probe кешируется на TTL, не вычисляется на каждый вызов.
Вариант через cachetools.TTLCache — проще, но требует sync-обёртки для async:
# adapters/out/receipt/receipt_health.py
from cachetools import TTLCache
import asyncio
_cache: TTLCache = TTLCache(maxsize=1, ttl=30)
_lock = asyncio.Lock()
async def check_receipt_health(client: httpx.AsyncClient) -> SystemHealth:
async with _lock:
cached = _cache.get("result")
if cached is not None:
return cached
result = await _probe_receipt(client)
_cache["result"] = result
return result
async def _probe_receipt(client: httpx.AsyncClient) -> SystemHealth:
try:
resp = await client.options("/", timeout=3.0)
return SystemHealth("up")
except Exception as exc:
return SystemHealth("down", detail=str(exc))
Для сложных случаев (несколько систем, метрики, централизованный реестр) — класс из предыдущего раздела. Для простых — функция с TTLCache. Оба подхода дают одинаковый результат по R-RES-HC-2.
Light probe — GET /health или OPTIONS /
R-RES-HC-3: probe-метод не должен выполнять бизнес-операции.
# ХОРОШО — лёгкий технический endpoint
resp = await client.get("/health", timeout=5.0)
# ХОРОШО — если нет /health, OPTIONS на корень
resp = await client.options("/", timeout=3.0)
Если внешняя система не предоставляет /health:
- OPTIONS на корень — почти всегда отвечает без реальной обработки.
- Самый дешёвый GET с
?limit=1(например,/products?limit=1), если OPTIONS не поддерживается. - В крайнем случае — TCP-connect проверить без HTTP:
asyncio.open_connection(host, port).
Никаких бизнес-операций:
# ПЛОХО — probe делает реальный запрос к Sber
async def _probe(self) -> SystemHealth:
payload = {"amount": 1, "currency": "RUB", "order_id": "test-probe"}
resp = await self.client.post("/register", json=payload) # изменяет состояние
...
Каждые 30 секунд в Sber появляется тестовый заказ. После года эксплуатации — миллион тестовых записей. Бизнес-метрики искажены. Если probe идёт с retry (R-RES-RE-1) — три тестовых запроса за раз.
FastAPI router: /health/live и /health/ready
R-RES-HC-4: readiness включает внешние системы, liveness — не включает.
# api/health_router.py
from fastapi import APIRouter, Depends
from starlette.responses import JSONResponse
router = APIRouter(prefix="/health", tags=["health"])
@router.get("/live")
async def liveness() -> dict:
return {"status": "up"}
@router.get("/ready")
async def readiness(
sber_health: SberHealthIndicator = Depends(),
receipt_health: ReceiptHealthIndicator = Depends(),
) -> JSONResponse:
checks = {
"sber": await sber_health.check(),
"receipt": await receipt_health.check(),
}
overall = "up" if all(c.status == "up" for c in checks.values()) else "down"
status_code = 200 if overall == "up" else 503
return JSONResponse(
content={"status": overall, "components": {k: {"status": v.status, "detail": v.detail} for k, v in checks.items()}},
status_code=status_code,
)
# k8s deployment
livenessProbe:
httpGet:
path: /health/live
port: 8080
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 8080
initialDelaySeconds: 15
periodSeconds: 10
Что важно:
/health/liveвозвращает200всегда, пока приложение запущено. k8s убивает pod только если liveness отвечает не-200 — перезапуск исправит зависший процесс, не Sber./health/readyвозвращает503, если критичные внешние системы недоступны. Pod выходит из Service backend pool — нет Sber-а, нет трафика на этот pod.- Какие системы включать в
ready— решение бизнес-домена. Если без Customer-service сервис физически ничего не может → включать. Если часть операций всё равно работает → не включать, handler сам вернёт 503.
Зависимости в DI-контейнере
Индикаторы живут как зависимости FastAPI, не как глобальные переменные:
# dependencies.py
from functools import lru_cache
import httpx
@lru_cache
def get_sber_health_indicator() -> SberHealthIndicator:
client = httpx.AsyncClient(
base_url=settings.sber.base_url,
limits=httpx.Limits(max_connections=5, max_keepalive_connections=3),
timeout=httpx.Timeout(connect=2.0, read=5.0, write=5.0, pool=1.0),
)
return SberHealthIndicator(client=client)
lru_cache гарантирует один экземпляр на процесс — состояние кеша (_last_result, _last_probe) сохраняется между запросами.
Закрытие клиента при завершении:
# main.py
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
indicator = get_sber_health_indicator()
await indicator.client.aclose()
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Probe без TTL-кеша, ходит в систему на каждый /health/ready | R-RES-HC-X1 | TTLCache(maxsize=1, ttl=30) или datetime-based кеш |
Probe вызывает business-операцию (register, confirm_payment) | R-RES-HC-X2 | GET /health или OPTIONS / |
Внешние системы в livenessProbe | R-RES-HC-4 | Только в readinessProbe |
| Один общий индикатор на Sber + Receipt + Customer | R-RES-HC-1 | Per-system индикатор, отдельный объект |
check() без asyncio.Lock при TTL-проверке (race condition) | R-RES-HC-2 | Double-checked lock под asyncio.Lock |
AsyncClient для probe — тот же shared-клиент адаптера | R-RES-ISO-1 | Отдельный лёгкий клиент только для probe |
Куда дальше
- Async и polling — task-queue и
asyncio.sleepв адаптерах - Bulkhead —
asyncio.Semaphoreper-system, изоляция вызовов - Circuit breaker — purgatory/aiobreaker на public-методе адаптера
- Конфигурация —
pydantic-settingsдля параметров CB/retry/timeout - Fallback — деградация без money-операций, не «тихий успех»
- Observability — метрики CB/bulkhead через prometheus-client, OTel-спаны
- OpenAPI generator binding — клиент из спеки, mapper DTO→domain
- Per-system isolation — отдельный
AsyncClient+ CB + semaphore на систему - Retry — tenacity, exponential backoff, идемпотентность
- Timeouts —
httpx.Timeout+asyncio.timeout, иерархия connect<read<total - Где какая защита — outbound/internal/inbound, когда что применять