Опирается на правила: R-RES-BH-1R-RES-BH-3 и R-RES-BH-X1 из Resilience Style Guide → раздел 6. Bulkhead.

Важно знать

  • asyncio.Semaphore(max_concurrent) per-system — обязательный слой отдельно от httpx.Limits. Limits ограничивает TCP-соединения; семафор — количество одновременно выполняющихся async-вызовов.
  • Семафор, не executor. run_in_executor / ThreadPoolExecutor как «bulkhead» — антипаттерн: теряются contextvars (trace, MDC-эквивалент), создаются лишние треды. asyncio.Semaphore работает в текущей таске.
  • Sizing: max_concurrent ≈ limits.max_connections × 0.8. Запас 20% — семафор должен срабатывать раньше исчерпания пула.
  • Fail-fast: при исчерпании слотов — немедленный asyncio.TimeoutError или BulkheadError, не очередь на секунды.
  • Bulkhead защищает event loop от насыщения, когда внешняя система отвечает медленно, но Circuit Breaker ещё не открыт.
  • Семафор оборачивает public-метод out-adapter (R-RES-BH-1), не сгенерированный клиент и не handler.
  • Имя семафора совпадает с именем системы: sber, receipt, insurance — так же как CB и клиент (R-RES-ISO-3).

Если asyncio.timeout — «один зависший call не висит вечно», а Circuit Breaker — «после N подряд ошибок не пускаем новые», то Bulkhead — это «не более N одновременных вообще». Три слоя работают в связке: timeout ограничивает каждый вызов, bulkhead ограничивает их одновременное число, CB останавливает поток при явной деградации. Раскрытие раздела 6 гайда для Python async-стека.

Bulkhead и connection pool — два разных слоя

R-RES-BH-1: Bulkhead — это не дубль httpx.Limits. Это другой уровень защиты.

СлойЧто ограничиваетКогда срабатывает
httpx.Limits(max_connections=N)TCP-соединения (транспортный уровень)При создании нового соединения через AsyncClient
asyncio.Semaphore(max_concurrent)Async-таски, входящие в защищённый методПри вызове public-метода out-adapter

Почему два слоя:

  • При залипании вызова пул занят долго — ждёт httpx.Timeout (read или total). Это секунды.
  • Семафор отказывает немедленно, как только лимит достигнут. Без блокировки event loop.
  • Семафор срабатывает раньше исчерпания пула — fail-fast, не накапливание pending-тасок.

Пример: max_connections = 20, Semaphore(16). При 16 одновременных 17-й получает отказ мгновенно, не занимая коннект. 4 «слота» остаются в Limits как буфер на переходные моменты (idle keep-alive, slow start).

asyncio.Semaphore в async context manager

R-RES-BH-2: семафор работает в текущей таске. contextvars (OTel trace context, любой ContextVar) не теряются.

# adapters/out/sber/sber_adapter.py

class SberAdapter:
    def __init__(
        self,
        client: httpx.AsyncClient,
        breaker: CircuitBreaker,
        sem: asyncio.Semaphore,
        settings: SberClientSettings,
    ) -> None:
        self._client = client
        self._breaker = breaker
        self._sem = sem
        self._settings = settings

    async def register(self, order: Order) -> PaymentRef:
        try:
            async with asyncio.timeout(self._settings.bulkhead_acquire_timeout):
                await self._sem.acquire()
        except TimeoutError:
            raise PaymentPortError.system_unavailable("sber") from None

        try:
            async with self._breaker:
                async with asyncio.timeout(self._settings.total_timeout):
                    resp = await self._client.post(
                        "/register",
                        json=to_sber_request(order),
                    )
                    resp.raise_for_status()
                    return to_payment_ref(resp.json())
        except CircuitBreakerError as exc:
            raise PaymentPortError.system_unavailable("sber") from exc
        finally:
            self._sem.release()

Порядок слоёв — семафор снаружи, CB внутри. Семафор контролирует, сколько тасок одновременно входит в CB; CB решает, пропускать ли каждую из них дальше.

Sizing — pool × 0.8

R-RES-BH-3: max_concurrent ≈ max_connections × 0.8. Булкхед срабатывает раньше, чем кончаются коннекты.

# config/client_settings.py

from pydantic_settings import BaseSettings, SettingsConfigDict

class SberClientSettings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="CLIENT_SBER__")

    max_connections: int = 20
    max_keepalive_connections: int = 10
    connect_timeout: float = 1.0
    read_timeout: float = 5.0
    total_timeout: float = 7.0
    bulkhead_max_concurrent: int = 16        # = max_connections × 0.8
    bulkhead_acquire_timeout: float = 0.1    # 100ms — fail-fast, не очередь
# bootstrap/dependencies.py

from functools import lru_cache

@lru_cache
def get_sber_semaphore(settings: SberClientSettings = Depends(get_sber_settings)) -> asyncio.Semaphore:
    return asyncio.Semaphore(settings.bulkhead_max_concurrent)

Логика sizing:

  • При 16 одновременных (семафор полон) 17-й получает TimeoutError через bulkhead_acquire_timeout — немедленно.
  • При 21 вызове без семафора все 21 таска ждут в httpx на total_timeout. Это держит event loop насыщенным.
  • bulkhead_acquire_timeout = 0.1s — компромисс: покрывает быстрое освобождение слота при коротких вызовах, но не превращает семафор в очередь.

Маппинг SemaphoreError в port-исключение

При превышении acquire_timeout — стандартный asyncio.TimeoutError. Адаптер маппит его в port-specific исключение: семантика для handler — «система перегружена / временно недоступна».

# adapters/out/product/product_catalog_adapter.py

class ProductCatalogAdapter:
    async def find_product(self, product_id: ProductId) -> Product | None:
        try:
            async with asyncio.timeout(self._settings.bulkhead_acquire_timeout):
                await self._sem.acquire()
        except TimeoutError:
            raise ProductCatalogPortError.system_unavailable("product-catalog") from None

        try:
            async with self._breaker:
                async with asyncio.timeout(self._settings.total_timeout):
                    resp = await self._client.get(f"/products/{product_id.value}")
                    if resp.status_code == 404:
                        return None
                    resp.raise_for_status()
                    return to_product(resp.json())
        except CircuitBreakerError as exc:
            raise ProductCatalogPortError.system_unavailable("product-catalog") from exc
        finally:
            self._sem.release()

Handler обрабатывает ProductCatalogPortError.system_unavailable единообразно — 503 или fallback в task-queue. Подробно про маппинг — в Circuit Breaker.

Пример: CustomerAdapter с двумя внешними системами

R-RES-ISO-1 / R-RES-ISO-X1: на каждую систему — отдельный семафор. Два клиента с одним семафором — зависание одной системы блокирует слоты другой.

# adapters/out/customer/customer_verification_adapter.py

class CustomerVerificationAdapter:
    def __init__(
        self,
        sber_client: httpx.AsyncClient,
        sber_sem: asyncio.Semaphore,
        sber_breaker: CircuitBreaker,
        insurance_client: httpx.AsyncClient,
        insurance_sem: asyncio.Semaphore,
        insurance_breaker: CircuitBreaker,
        settings: CustomerVerificationSettings,
    ) -> None: ...

    async def verify_identity(self, customer_id: CustomerId) -> VerificationResult:
        await self._acquire(self._sber_sem, self._settings.sber_acquire_timeout, "sber")
        try:
            async with self._sber_breaker:
                async with asyncio.timeout(self._settings.sber_total):
                    resp = await self._sber_client.get(f"/customer/{customer_id.value}/kyc")
                    return to_verification_result(resp.json())
        except CircuitBreakerError as exc:
            raise CustomerVerificationPortError.system_unavailable("sber") from exc
        finally:
            self._sber_sem.release()

    async def check_insurance_eligibility(self, customer_id: CustomerId) -> bool:
        await self._acquire(self._insurance_sem, self._settings.insurance_acquire_timeout, "insurance")
        try:
            async with self._insurance_breaker:
                async with asyncio.timeout(self._settings.insurance_total):
                    resp = await self._insurance_client.get(f"/eligibility/{customer_id.value}")
                    return resp.json().get("eligible", False)
        except CircuitBreakerError as exc:
            raise CustomerVerificationPortError.system_unavailable("insurance") from exc
        finally:
            self._insurance_sem.release()

    @staticmethod
    async def _acquire(sem: asyncio.Semaphore, timeout: float, system: str) -> None:
        try:
            async with asyncio.timeout(timeout):
                await sem.acquire()
        except TimeoutError:
            raise CustomerVerificationPortError.system_unavailable(system) from None

Что запрещено

АнтипаттернПравилоЧто взамен
run_in_executor / ThreadPoolExecutor как bulkheadR-RES-BH-X1asyncio.Semaphore в текущей таске
Семафор отсутствует на out-adapterR-RES-BH-1asyncio.Semaphore обязателен
Semaphore(max_connections) без запасаR-RES-BH-3max_connections × 0.8
bulkhead_acquire_timeout > 1sR-RES-BH-30.1s — fail-fast, не очередь
Один семафор на несколько внешних системR-RES-ISO-X1Отдельный Semaphore per-system
Без маппинга TimeoutError в port-исключениеR-RES-CB-6raise PortError.system_unavailable(...)

Куда дальше

  • Per-system isolation — httpx.AsyncClient и httpx.Limits per-system.
  • Circuit Breaker — слой, работающий вместе с Bulkhead внутри семафора.
  • Fallback — что делать при system_unavailable из-за переполненного семафора.
  • Конфигурация — pydantic-settings и per-system override.
  • Observability — метрики semaphore_rejections_total через prometheus-client.
  • Async и polling — когда asyncio.sleep допустим, когда — task-queue.
  • Timeouts — иерархия connect < read < total в httpx.Timeout.
  • Retry — tenacity и согласование с bulkhead/CB.
  • Health checks — TTL-кеш и лёгкий probe.
  • Where protection goes — какие слои на каком уровне обязательны.
  • OpenAPI generator binding — bulkhead на адаптере, не на сгенерированном клиенте.