Опирается на правила: R-RES-BH-1R-RES-BH-3 и R-RES-BH-X1 из Resilience Rules → раздел 6. Bulkhead.

Важно знать

  • semaphore.NewWeighted(maxConcurrent) из golang.org/x/sync/semaphore — обязательный слой отдельно от http.Transport connection pool. Pool ограничивает TCP-соединения; семафор — одновременные вызовы.
  • Semaphore работает в той же горутине. context.Context, OTel-трейс, логгер — всё сохраняется. Отдельного пула горутин не создаётся.
  • Sizing: maxConcurrent ≈ MaxIdleConnsPerHost × 0.8 — семафор срабатывает раньше исчерпания pool'а.
  • sem.Acquire(ctx, 1) уважает ctx.Done() — при отмене контекста (timeout, cancel) немедленно возвращает ошибку, не блокирует горутину.
  • При переполнении sem.Acquire возвращает ctx.Err() — адаптер маппит в port-specific ошибку системной недоступности.
  • Bulkhead защищает сервис от исчерпания горутин-воркеров, когда внешняя система отвечает медленно, но CB ещё не открыт.
  • Один semaphore.Weighted — на одну систему. sber, insurance, receipt — каждая со своим семафором.

Если timeout — это «один зависший вызов не висит вечно», а Circuit Breaker — «после N ошибок поток прекращаем», то Bulkhead — «не более N одновременных в принципе». Три слоя работают в связке: timeout ограничивает длительность каждого вызова, bulkhead ограничивает их одновременное количество, CB останавливает поток при явной деградации. Раскрытие правил R-RES-BH-* для Go-стека.

Bulkhead и connection pool — два разных слоя

R-RES-BH-1: Bulkhead — не дубль connection pool. Это другой слой защиты с другим моментом срабатывания.

СлойЧто ограничиваетКогда срабатывает
http.Transport.MaxIdleConnsPerHostTCP-соединенияпри выдаче коннекта из pool
semaphore.Weightedодновременные вызовыпри входе в метод адаптера

Разница в скорости отказа:

  • При залипании вызова pool занят секунды — ждёт CallTimeout или ResponseHeaderTimeout.
  • Семафор отказывает новым вызовам сразу, как только лимит достигнут.

Пример: MaxIdleConnsPerHost = 25, maxConcurrent = 20. При 20 одновременных вызовах 21-й получает ошибку от sem.Acquire мгновенно, не создавая новую горутину для HTTP-запроса. 5 свободных слотов — буфер на переходные моменты (keep-alive соединения возвращаются в idle).

Semaphore-based: как устроен

R-RES-BH-2: semaphore.NewWeighted(maxConcurrent) из пакета golang.org/x/sync/semaphore.

// adapters/out/sber/sber_adapter.go
type SberAdapter struct {
    client  *http.Client
    breaker *gobreaker.CircuitBreaker
    sem     *semaphore.Weighted
    cfg     SberClientConfig
}

func NewSberAdapter(cfg SberClientConfig) *SberAdapter {
    return &SberAdapter{
        client:  newSberHTTPClient(cfg),
        breaker: newSberBreaker("sber"),
        sem:     semaphore.NewWeighted(int64(cfg.MaxConcurrent)),
        cfg:     cfg,
    }
}

Порядок слоёв в методе адаптера: сначала sem.Acquire (bulkhead), затем вход в breaker.Execute (CB), внутри — HTTP-вызов с контекстным таймаутом.

func (a *SberAdapter) Register(ctx context.Context, order Order) (PaymentRef, error) {
    ctx, span := otel.Tracer("sber-adapter").Start(ctx, "SberAdapter.Register")
    defer span.End()
    span.SetAttributes(
        attribute.String("external.system", "sber"),
        attribute.String("circuit_breaker.state", a.breaker.State().String()),
    )

    if err := a.sem.Acquire(ctx, 1); err != nil {
        bulkheadRejectedTotal.WithLabelValues("sber").Inc()
        return PaymentRef{}, &PaymentSystemUnavailableError{System: "sber", Cause: err}
    }
    defer a.sem.Release(1)

    raw, err := a.breaker.Execute(func() (any, error) {
        callCtx, cancel := capTimeout(ctx, a.cfg.CallTimeout)
        defer cancel()
        return a.doRegister(callCtx, order)
    })
    if err != nil {
        if errors.Is(err, gobreaker.ErrOpenState) || errors.Is(err, gobreaker.ErrTooManyRequests) {
            return PaymentRef{}, &PaymentSystemUnavailableError{System: "sber", Cause: err}
        }
        return PaymentRef{}, fmt.Errorf("sber register: %w", err)
    }
    return raw.(PaymentRef), nil
}

sem.Acquire(ctx, 1) — блокируется до получения разрешения или до завершения контекста. При ctx.Done() (timeout upstream-запроса, cancel клиента) немедленно возвращает ctx.Err(). Горутина не зависает в ожидании слота.

Sizing — 80% от pool

R-RES-BH-3: maxConcurrent ≈ MaxIdleConnsPerHost × 0.8. Семафор должен срабатывать раньше исчерпания коннектов.

// adapters/out/sber/config.go
type SberClientConfig struct {
    ConnectTimeout time.Duration `envconfig:"SBER_CONNECT_TIMEOUT" default:"2s"`
    ReadTimeout    time.Duration `envconfig:"SBER_READ_TIMEOUT"    default:"10s"`
    CallTimeout    time.Duration `envconfig:"SBER_CALL_TIMEOUT"    default:"15s"`
    MaxConcurrent  int           `envconfig:"SBER_MAX_CONCURRENT"  default:"20"`
    BaseURL        string        `envconfig:"SBER_BASE_URL"        required:"true"`
}

func newSberHTTPClient(cfg SberClientConfig) *http.Client {
    return &http.Client{
        Timeout: cfg.CallTimeout,
        Transport: &http.Transport{
            DialContext:           (&net.Dialer{Timeout: cfg.ConnectTimeout}).DialContext,
            ResponseHeaderTimeout: cfg.ReadTimeout,
            MaxIdleConnsPerHost:   cfg.MaxConcurrent + 5, // +5 = запас pool (R-RES-ISO-2)
            IdleConnTimeout:       90 * time.Second,
        },
    }
}

При MaxConcurrent = 20 семафор создаётся с semaphore.NewWeighted(20). Transport получает MaxIdleConnsPerHost = 25. Семафор заполняется при 20 одновременных, pool — при 25. Зазор 5 слотов — буфер keep-alive.

Суммарно по всем системам: SberMaxConcurrent + InsuranceMaxConcurrent + ReceiptMaxConcurrent ≤ половина пула БД (R-RES-ISO-2). Внешние клиенты не должны съедать соединения с БД.

Маппинг ошибки переполнения

sem.Acquire возвращает ctx.Err() при двух сценариях:

  • Контекст истёк (upstream timeout, client cancel) — это не overflow, это отмена.
  • semaphore.Weighted не имеет встроенного «немедленного отказа» без ожидания. Чтобы получить fail-fast при заполненном семафоре, можно использовать неблокирующий TryAcquire:
// adapters/out/insurance/insurance_adapter.go
func (a *InsuranceAdapter) RequestCoverage(ctx context.Context, order Order) (CoverageRef, error) {
    if !a.sem.TryAcquire(1) {
        bulkheadRejectedTotal.WithLabelValues("insurance").Inc()
        return CoverageRef{}, &InsuranceUnavailableError{
            Cause: errors.New("bulkhead full: too many concurrent requests to insurance"),
        }
    }
    defer a.sem.Release(1)

    raw, err := a.breaker.Execute(func() (any, error) {
        callCtx, cancel := capTimeout(ctx, a.cfg.CallTimeout)
        defer cancel()
        return a.doRequestCoverage(callCtx, order)
    })
    if err != nil {
        if errors.Is(err, gobreaker.ErrOpenState) {
            return CoverageRef{}, &InsuranceUnavailableError{Cause: err}
        }
        return CoverageRef{}, fmt.Errorf("insurance request coverage: %w", err)
    }
    return raw.(CoverageRef), nil
}

TryAcquire(1) — неблокирующая попытка. Если семафор заполнен — возвращает false немедленно. Это строгий fail-fast: новые вызовы не ждут, пока освободится слот.

Выбор между Acquire(ctx, 1) и TryAcquire(1):

Acquire(ctx, 1)TryAcquire(1)
Поведение при заполненииждёт до ctx.Done()отказывает немедленно
Подходит длякоротких вызовов, где небольшое ожидание допустимострогий fail-fast
Рекомендацияoutbound к внешним системам под нагрузкойплатёжные системы, критичные пути

Три системы — три семафора

Каждый out-adapter держит свой semaphore.Weighted. Перегрузка Sber не затрагивает семафор Receipt или Insurance.

// adapters/out/receipt/receipt_adapter.go
type ReceiptAdapter struct {
    client  *http.Client
    breaker *gobreaker.CircuitBreaker
    sem     *semaphore.Weighted
    cfg     ReceiptClientConfig
}

func NewReceiptAdapter(cfg ReceiptClientConfig) *ReceiptAdapter {
    return &ReceiptAdapter{
        client:  newReceiptHTTPClient(cfg),
        breaker: newReceiptBreaker("receipt"),
        sem:     semaphore.NewWeighted(int64(cfg.MaxConcurrent)),
        cfg:     cfg,
    }
}

func (a *ReceiptAdapter) CreateReceipt(ctx context.Context, order Order) (ReceiptRef, error) {
    if !a.sem.TryAcquire(1) {
        bulkheadRejectedTotal.WithLabelValues("receipt").Inc()
        return ReceiptRef{}, &ReceiptSystemUnavailableError{
            Cause: errors.New("bulkhead full: receipt system"),
        }
    }
    defer a.sem.Release(1)
    // CB + HTTP-вызов
}

Имя системы в gobreaker.Settings.Name, в метриках (bulkhead_rejected_total{system="receipt"}), в логах — одинаковое. R-RES-ISO-3.

Observability bulkhead

R-RES-OBS-1: метрика bulkhead_rejected_total регистрирует каждый отказ семафора.

// adapters/out/metrics.go
var bulkheadRejectedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
    Name: "bulkhead_rejected_total",
    Help: "Total requests rejected by bulkhead",
}, []string{"system"})

Дополнительно: текущее число занятых слотов отражает gauge bulkhead_active_calls:

var bulkheadActiveCalls = promauto.NewGaugeVec(prometheus.GaugeOpts{
    Name: "bulkhead_active_calls",
    Help: "Current number of concurrent calls held by bulkhead",
}, []string{"system"})

Инкремент при Acquire/TryAcquire, декремент в defer Release. Если bulkhead_active_calls{system="sber"} стабильно у лимита, а bulkhead_rejected_total растёт — система перегружена, пора пересматривать MaxConcurrent или смотреть на причину медленных ответов.

Что запрещено

АнтипаттернПравилоЧто взамен
errgroup.Group с ограниченным размером пула горутин как bulkhead для sync-кодаR-RES-BH-X1semaphore.Weighted — не теряет context.Context и OTel
Один semaphore.Weighted на все системыR-RES-ISO-1per-system семафор: sber, insurance, receipt — каждый свой
semaphore.NewWeighted(MaxIdleConnsPerHost) без запасаR-RES-BH-3× 0.8 — семафор должен сработать раньше pool
Bulkhead отсутствует на out-adapterR-RES-BH-1sem.Acquire / sem.TryAcquire обязателен на public-методе
sem.Acquire без инкремента метрики при отказеR-RES-OBS-1bulkheadRejectedTotal.WithLabelValues(system).Inc()

Куда дальше

  • Per-system isolation — *http.Client + *http.Transport на каждую систему, sizing pool.
  • Circuit Breaker — gobreaker.CircuitBreaker: другой слой, работает вместе с bulkhead.
  • Timeouts — context.WithTimeout, capTimeout, иерархия connect/read/call.
  • Fallback — что делать при ошибке sem.TryAcquire и InsuranceUnavailableError.
  • Observability — полный набор метрик: circuit_breaker_state, retry_attempts_total, bulkhead_rejected_total.
  • Конфигурация — envconfig-теги, per-system конфиг, MaxConcurrent.
  • Где какая защита — какой набор обязателен для outbound, internal, scheduler.
  • Health checks — TTL-кеш probe, /health/ready, per-system индикатор.
  • Retry — retry.Do с RetryIf; согласование с bulkhead: не ретраить при InsuranceUnavailableError.
  • Async и polling — task-queue вместо time.Sleep-цикла в handler'е.
  • OpenAPI generator binding — oapi-codegen, mapper generated → domain; bulkhead на public-методе адаптера, не на сгенерированном клиенте.