Опирается на правила:
R-RES-BH-1…R-RES-BH-3иR-RES-BH-X1из Resilience Style Guide → раздел 6. Bulkhead.
Важно знать
asyncio.Semaphore(max_concurrent)per-system — обязательный слой отдельно отhttpx.Limits.Limitsограничивает TCP-соединения; семафор — количество одновременно выполняющихся async-вызовов.- Семафор, не executor.
run_in_executor/ThreadPoolExecutorкак «bulkhead» — антипаттерн: теряютсяcontextvars(trace, MDC-эквивалент), создаются лишние треды.asyncio.Semaphoreработает в текущей таске.- Sizing:
max_concurrent ≈ limits.max_connections × 0.8. Запас 20% — семафор должен срабатывать раньше исчерпания пула.- Fail-fast: при исчерпании слотов — немедленный
asyncio.TimeoutErrorилиBulkheadError, не очередь на секунды.- Bulkhead защищает event loop от насыщения, когда внешняя система отвечает медленно, но Circuit Breaker ещё не открыт.
- Семафор оборачивает public-метод out-adapter (
R-RES-BH-1), не сгенерированный клиент и не handler.- Имя семафора совпадает с именем системы:
sber,receipt,insurance— так же как CB и клиент (R-RES-ISO-3).
Если asyncio.timeout — «один зависший call не висит вечно», а Circuit Breaker — «после N подряд ошибок не пускаем новые», то Bulkhead — это «не более N одновременных вообще». Три слоя работают в связке: timeout ограничивает каждый вызов, bulkhead ограничивает их одновременное число, CB останавливает поток при явной деградации. Раскрытие раздела 6 гайда для Python async-стека.
Bulkhead и connection pool — два разных слоя
R-RES-BH-1: Bulkhead — это не дубль httpx.Limits. Это другой уровень защиты.
| Слой | Что ограничивает | Когда срабатывает |
|---|---|---|
httpx.Limits(max_connections=N) | TCP-соединения (транспортный уровень) | При создании нового соединения через AsyncClient |
asyncio.Semaphore(max_concurrent) | Async-таски, входящие в защищённый метод | При вызове public-метода out-adapter |
Почему два слоя:
- При залипании вызова пул занят долго — ждёт
httpx.Timeout(read или total). Это секунды. - Семафор отказывает немедленно, как только лимит достигнут. Без блокировки event loop.
- Семафор срабатывает раньше исчерпания пула — fail-fast, не накапливание pending-тасок.
Пример: max_connections = 20, Semaphore(16). При 16 одновременных 17-й получает отказ мгновенно, не занимая коннект. 4 «слота» остаются в Limits как буфер на переходные моменты (idle keep-alive, slow start).
asyncio.Semaphore в async context manager
R-RES-BH-2: семафор работает в текущей таске. contextvars (OTel trace context, любой ContextVar) не теряются.
# adapters/out/sber/sber_adapter.py
class SberAdapter:
def __init__(
self,
client: httpx.AsyncClient,
breaker: CircuitBreaker,
sem: asyncio.Semaphore,
settings: SberClientSettings,
) -> None:
self._client = client
self._breaker = breaker
self._sem = sem
self._settings = settings
async def register(self, order: Order) -> PaymentRef:
try:
async with asyncio.timeout(self._settings.bulkhead_acquire_timeout):
await self._sem.acquire()
except TimeoutError:
raise PaymentPortError.system_unavailable("sber") from None
try:
async with self._breaker:
async with asyncio.timeout(self._settings.total_timeout):
resp = await self._client.post(
"/register",
json=to_sber_request(order),
)
resp.raise_for_status()
return to_payment_ref(resp.json())
except CircuitBreakerError as exc:
raise PaymentPortError.system_unavailable("sber") from exc
finally:
self._sem.release()
Порядок слоёв — семафор снаружи, CB внутри. Семафор контролирует, сколько тасок одновременно входит в CB; CB решает, пропускать ли каждую из них дальше.
Sizing — pool × 0.8
R-RES-BH-3: max_concurrent ≈ max_connections × 0.8. Булкхед срабатывает раньше, чем кончаются коннекты.
# config/client_settings.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class SberClientSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="CLIENT_SBER__")
max_connections: int = 20
max_keepalive_connections: int = 10
connect_timeout: float = 1.0
read_timeout: float = 5.0
total_timeout: float = 7.0
bulkhead_max_concurrent: int = 16 # = max_connections × 0.8
bulkhead_acquire_timeout: float = 0.1 # 100ms — fail-fast, не очередь
# bootstrap/dependencies.py
from functools import lru_cache
@lru_cache
def get_sber_semaphore(settings: SberClientSettings = Depends(get_sber_settings)) -> asyncio.Semaphore:
return asyncio.Semaphore(settings.bulkhead_max_concurrent)
Логика sizing:
- При 16 одновременных (семафор полон) 17-й получает
TimeoutErrorчерезbulkhead_acquire_timeout— немедленно. - При 21 вызове без семафора все 21 таска ждут в
httpxнаtotal_timeout. Это держит event loop насыщенным. bulkhead_acquire_timeout = 0.1s— компромисс: покрывает быстрое освобождение слота при коротких вызовах, но не превращает семафор в очередь.
Маппинг SemaphoreError в port-исключение
При превышении acquire_timeout — стандартный asyncio.TimeoutError. Адаптер маппит его в port-specific исключение: семантика для handler — «система перегружена / временно недоступна».
# adapters/out/product/product_catalog_adapter.py
class ProductCatalogAdapter:
async def find_product(self, product_id: ProductId) -> Product | None:
try:
async with asyncio.timeout(self._settings.bulkhead_acquire_timeout):
await self._sem.acquire()
except TimeoutError:
raise ProductCatalogPortError.system_unavailable("product-catalog") from None
try:
async with self._breaker:
async with asyncio.timeout(self._settings.total_timeout):
resp = await self._client.get(f"/products/{product_id.value}")
if resp.status_code == 404:
return None
resp.raise_for_status()
return to_product(resp.json())
except CircuitBreakerError as exc:
raise ProductCatalogPortError.system_unavailable("product-catalog") from exc
finally:
self._sem.release()
Handler обрабатывает ProductCatalogPortError.system_unavailable единообразно — 503 или fallback в task-queue. Подробно про маппинг — в Circuit Breaker.
Пример: CustomerAdapter с двумя внешними системами
R-RES-ISO-1 / R-RES-ISO-X1: на каждую систему — отдельный семафор. Два клиента с одним семафором — зависание одной системы блокирует слоты другой.
# adapters/out/customer/customer_verification_adapter.py
class CustomerVerificationAdapter:
def __init__(
self,
sber_client: httpx.AsyncClient,
sber_sem: asyncio.Semaphore,
sber_breaker: CircuitBreaker,
insurance_client: httpx.AsyncClient,
insurance_sem: asyncio.Semaphore,
insurance_breaker: CircuitBreaker,
settings: CustomerVerificationSettings,
) -> None: ...
async def verify_identity(self, customer_id: CustomerId) -> VerificationResult:
await self._acquire(self._sber_sem, self._settings.sber_acquire_timeout, "sber")
try:
async with self._sber_breaker:
async with asyncio.timeout(self._settings.sber_total):
resp = await self._sber_client.get(f"/customer/{customer_id.value}/kyc")
return to_verification_result(resp.json())
except CircuitBreakerError as exc:
raise CustomerVerificationPortError.system_unavailable("sber") from exc
finally:
self._sber_sem.release()
async def check_insurance_eligibility(self, customer_id: CustomerId) -> bool:
await self._acquire(self._insurance_sem, self._settings.insurance_acquire_timeout, "insurance")
try:
async with self._insurance_breaker:
async with asyncio.timeout(self._settings.insurance_total):
resp = await self._insurance_client.get(f"/eligibility/{customer_id.value}")
return resp.json().get("eligible", False)
except CircuitBreakerError as exc:
raise CustomerVerificationPortError.system_unavailable("insurance") from exc
finally:
self._insurance_sem.release()
@staticmethod
async def _acquire(sem: asyncio.Semaphore, timeout: float, system: str) -> None:
try:
async with asyncio.timeout(timeout):
await sem.acquire()
except TimeoutError:
raise CustomerVerificationPortError.system_unavailable(system) from None
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
run_in_executor / ThreadPoolExecutor как bulkhead | R-RES-BH-X1 | asyncio.Semaphore в текущей таске |
| Семафор отсутствует на out-adapter | R-RES-BH-1 | asyncio.Semaphore обязателен |
Semaphore(max_connections) без запаса | R-RES-BH-3 | max_connections × 0.8 |
bulkhead_acquire_timeout > 1s | R-RES-BH-3 | 0.1s — fail-fast, не очередь |
| Один семафор на несколько внешних систем | R-RES-ISO-X1 | Отдельный Semaphore per-system |
Без маппинга TimeoutError в port-исключение | R-RES-CB-6 | raise PortError.system_unavailable(...) |
Куда дальше
- Per-system isolation —
httpx.AsyncClientиhttpx.Limitsper-system. - Circuit Breaker — слой, работающий вместе с Bulkhead внутри семафора.
- Fallback — что делать при
system_unavailableиз-за переполненного семафора. - Конфигурация —
pydantic-settingsи per-system override. - Observability — метрики
semaphore_rejections_totalчерезprometheus-client. - Async и polling — когда
asyncio.sleepдопустим, когда — task-queue. - Timeouts — иерархия
connect < read < totalвhttpx.Timeout. - Retry —
tenacityи согласование с bulkhead/CB. - Health checks — TTL-кеш и лёгкий probe.
- Where protection goes — какие слои на каком уровне обязательны.
- OpenAPI generator binding — bulkhead на адаптере, не на сгенерированном клиенте.