Опирается на правила:
R-RES-BH-1…R-RES-BH-3иR-RES-BH-X1из Resilience Rules → раздел 6. Bulkhead.
Важно знать
semaphore.NewWeighted(maxConcurrent)изgolang.org/x/sync/semaphore— обязательный слой отдельно отhttp.Transportconnection pool. Pool ограничивает TCP-соединения; семафор — одновременные вызовы.- Semaphore работает в той же горутине.
context.Context, OTel-трейс, логгер — всё сохраняется. Отдельного пула горутин не создаётся.- Sizing:
maxConcurrent ≈ MaxIdleConnsPerHost × 0.8— семафор срабатывает раньше исчерпания pool'а.sem.Acquire(ctx, 1)уважаетctx.Done()— при отмене контекста (timeout, cancel) немедленно возвращает ошибку, не блокирует горутину.- При переполнении
sem.Acquireвозвращаетctx.Err()— адаптер маппит в port-specific ошибку системной недоступности.- Bulkhead защищает сервис от исчерпания горутин-воркеров, когда внешняя система отвечает медленно, но CB ещё не открыт.
- Один
semaphore.Weighted— на одну систему.sber,insurance,receipt— каждая со своим семафором.
Если timeout — это «один зависший вызов не висит вечно», а Circuit Breaker — «после N ошибок поток прекращаем», то Bulkhead — «не более N одновременных в принципе». Три слоя работают в связке: timeout ограничивает длительность каждого вызова, bulkhead ограничивает их одновременное количество, CB останавливает поток при явной деградации. Раскрытие правил R-RES-BH-* для Go-стека.
Bulkhead и connection pool — два разных слоя
R-RES-BH-1: Bulkhead — не дубль connection pool. Это другой слой защиты с другим моментом срабатывания.
| Слой | Что ограничивает | Когда срабатывает |
|---|---|---|
http.Transport.MaxIdleConnsPerHost | TCP-соединения | при выдаче коннекта из pool |
semaphore.Weighted | одновременные вызовы | при входе в метод адаптера |
Разница в скорости отказа:
- При залипании вызова pool занят секунды — ждёт
CallTimeoutилиResponseHeaderTimeout. - Семафор отказывает новым вызовам сразу, как только лимит достигнут.
Пример: MaxIdleConnsPerHost = 25, maxConcurrent = 20. При 20 одновременных вызовах 21-й получает ошибку от sem.Acquire мгновенно, не создавая новую горутину для HTTP-запроса. 5 свободных слотов — буфер на переходные моменты (keep-alive соединения возвращаются в idle).
Semaphore-based: как устроен
R-RES-BH-2: semaphore.NewWeighted(maxConcurrent) из пакета golang.org/x/sync/semaphore.
// adapters/out/sber/sber_adapter.go
type SberAdapter struct {
client *http.Client
breaker *gobreaker.CircuitBreaker
sem *semaphore.Weighted
cfg SberClientConfig
}
func NewSberAdapter(cfg SberClientConfig) *SberAdapter {
return &SberAdapter{
client: newSberHTTPClient(cfg),
breaker: newSberBreaker("sber"),
sem: semaphore.NewWeighted(int64(cfg.MaxConcurrent)),
cfg: cfg,
}
}
Порядок слоёв в методе адаптера: сначала sem.Acquire (bulkhead), затем вход в breaker.Execute (CB), внутри — HTTP-вызов с контекстным таймаутом.
func (a *SberAdapter) Register(ctx context.Context, order Order) (PaymentRef, error) {
ctx, span := otel.Tracer("sber-adapter").Start(ctx, "SberAdapter.Register")
defer span.End()
span.SetAttributes(
attribute.String("external.system", "sber"),
attribute.String("circuit_breaker.state", a.breaker.State().String()),
)
if err := a.sem.Acquire(ctx, 1); err != nil {
bulkheadRejectedTotal.WithLabelValues("sber").Inc()
return PaymentRef{}, &PaymentSystemUnavailableError{System: "sber", Cause: err}
}
defer a.sem.Release(1)
raw, err := a.breaker.Execute(func() (any, error) {
callCtx, cancel := capTimeout(ctx, a.cfg.CallTimeout)
defer cancel()
return a.doRegister(callCtx, order)
})
if err != nil {
if errors.Is(err, gobreaker.ErrOpenState) || errors.Is(err, gobreaker.ErrTooManyRequests) {
return PaymentRef{}, &PaymentSystemUnavailableError{System: "sber", Cause: err}
}
return PaymentRef{}, fmt.Errorf("sber register: %w", err)
}
return raw.(PaymentRef), nil
}
sem.Acquire(ctx, 1) — блокируется до получения разрешения или до завершения контекста. При ctx.Done() (timeout upstream-запроса, cancel клиента) немедленно возвращает ctx.Err(). Горутина не зависает в ожидании слота.
Sizing — 80% от pool
R-RES-BH-3: maxConcurrent ≈ MaxIdleConnsPerHost × 0.8. Семафор должен срабатывать раньше исчерпания коннектов.
// adapters/out/sber/config.go
type SberClientConfig struct {
ConnectTimeout time.Duration `envconfig:"SBER_CONNECT_TIMEOUT" default:"2s"`
ReadTimeout time.Duration `envconfig:"SBER_READ_TIMEOUT" default:"10s"`
CallTimeout time.Duration `envconfig:"SBER_CALL_TIMEOUT" default:"15s"`
MaxConcurrent int `envconfig:"SBER_MAX_CONCURRENT" default:"20"`
BaseURL string `envconfig:"SBER_BASE_URL" required:"true"`
}
func newSberHTTPClient(cfg SberClientConfig) *http.Client {
return &http.Client{
Timeout: cfg.CallTimeout,
Transport: &http.Transport{
DialContext: (&net.Dialer{Timeout: cfg.ConnectTimeout}).DialContext,
ResponseHeaderTimeout: cfg.ReadTimeout,
MaxIdleConnsPerHost: cfg.MaxConcurrent + 5, // +5 = запас pool (R-RES-ISO-2)
IdleConnTimeout: 90 * time.Second,
},
}
}
При MaxConcurrent = 20 семафор создаётся с semaphore.NewWeighted(20). Transport получает MaxIdleConnsPerHost = 25. Семафор заполняется при 20 одновременных, pool — при 25. Зазор 5 слотов — буфер keep-alive.
Суммарно по всем системам: SberMaxConcurrent + InsuranceMaxConcurrent + ReceiptMaxConcurrent ≤ половина пула БД (R-RES-ISO-2). Внешние клиенты не должны съедать соединения с БД.
Маппинг ошибки переполнения
sem.Acquire возвращает ctx.Err() при двух сценариях:
- Контекст истёк (upstream timeout, client cancel) — это не overflow, это отмена.
semaphore.Weightedне имеет встроенного «немедленного отказа» без ожидания. Чтобы получить fail-fast при заполненном семафоре, можно использовать неблокирующийTryAcquire:
// adapters/out/insurance/insurance_adapter.go
func (a *InsuranceAdapter) RequestCoverage(ctx context.Context, order Order) (CoverageRef, error) {
if !a.sem.TryAcquire(1) {
bulkheadRejectedTotal.WithLabelValues("insurance").Inc()
return CoverageRef{}, &InsuranceUnavailableError{
Cause: errors.New("bulkhead full: too many concurrent requests to insurance"),
}
}
defer a.sem.Release(1)
raw, err := a.breaker.Execute(func() (any, error) {
callCtx, cancel := capTimeout(ctx, a.cfg.CallTimeout)
defer cancel()
return a.doRequestCoverage(callCtx, order)
})
if err != nil {
if errors.Is(err, gobreaker.ErrOpenState) {
return CoverageRef{}, &InsuranceUnavailableError{Cause: err}
}
return CoverageRef{}, fmt.Errorf("insurance request coverage: %w", err)
}
return raw.(CoverageRef), nil
}
TryAcquire(1) — неблокирующая попытка. Если семафор заполнен — возвращает false немедленно. Это строгий fail-fast: новые вызовы не ждут, пока освободится слот.
Выбор между Acquire(ctx, 1) и TryAcquire(1):
Acquire(ctx, 1) | TryAcquire(1) | |
|---|---|---|
| Поведение при заполнении | ждёт до ctx.Done() | отказывает немедленно |
| Подходит для | коротких вызовов, где небольшое ожидание допустимо | строгий fail-fast |
| Рекомендация | outbound к внешним системам под нагрузкой | платёжные системы, критичные пути |
Три системы — три семафора
Каждый out-adapter держит свой semaphore.Weighted. Перегрузка Sber не затрагивает семафор Receipt или Insurance.
// adapters/out/receipt/receipt_adapter.go
type ReceiptAdapter struct {
client *http.Client
breaker *gobreaker.CircuitBreaker
sem *semaphore.Weighted
cfg ReceiptClientConfig
}
func NewReceiptAdapter(cfg ReceiptClientConfig) *ReceiptAdapter {
return &ReceiptAdapter{
client: newReceiptHTTPClient(cfg),
breaker: newReceiptBreaker("receipt"),
sem: semaphore.NewWeighted(int64(cfg.MaxConcurrent)),
cfg: cfg,
}
}
func (a *ReceiptAdapter) CreateReceipt(ctx context.Context, order Order) (ReceiptRef, error) {
if !a.sem.TryAcquire(1) {
bulkheadRejectedTotal.WithLabelValues("receipt").Inc()
return ReceiptRef{}, &ReceiptSystemUnavailableError{
Cause: errors.New("bulkhead full: receipt system"),
}
}
defer a.sem.Release(1)
// CB + HTTP-вызов
}
Имя системы в gobreaker.Settings.Name, в метриках (bulkhead_rejected_total{system="receipt"}), в логах — одинаковое. R-RES-ISO-3.
Observability bulkhead
R-RES-OBS-1: метрика bulkhead_rejected_total регистрирует каждый отказ семафора.
// adapters/out/metrics.go
var bulkheadRejectedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "bulkhead_rejected_total",
Help: "Total requests rejected by bulkhead",
}, []string{"system"})
Дополнительно: текущее число занятых слотов отражает gauge bulkhead_active_calls:
var bulkheadActiveCalls = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "bulkhead_active_calls",
Help: "Current number of concurrent calls held by bulkhead",
}, []string{"system"})
Инкремент при Acquire/TryAcquire, декремент в defer Release. Если bulkhead_active_calls{system="sber"} стабильно у лимита, а bulkhead_rejected_total растёт — система перегружена, пора пересматривать MaxConcurrent или смотреть на причину медленных ответов.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
errgroup.Group с ограниченным размером пула горутин как bulkhead для sync-кода | R-RES-BH-X1 | semaphore.Weighted — не теряет context.Context и OTel |
Один semaphore.Weighted на все системы | R-RES-ISO-1 | per-system семафор: sber, insurance, receipt — каждый свой |
semaphore.NewWeighted(MaxIdleConnsPerHost) без запаса | R-RES-BH-3 | × 0.8 — семафор должен сработать раньше pool |
| Bulkhead отсутствует на out-adapter | R-RES-BH-1 | sem.Acquire / sem.TryAcquire обязателен на public-методе |
sem.Acquire без инкремента метрики при отказе | R-RES-OBS-1 | bulkheadRejectedTotal.WithLabelValues(system).Inc() |
Куда дальше
- Per-system isolation —
*http.Client+*http.Transportна каждую систему, sizing pool. - Circuit Breaker —
gobreaker.CircuitBreaker: другой слой, работает вместе с bulkhead. - Timeouts —
context.WithTimeout,capTimeout, иерархия connect/read/call. - Fallback — что делать при ошибке
sem.TryAcquireиInsuranceUnavailableError. - Observability — полный набор метрик:
circuit_breaker_state,retry_attempts_total,bulkhead_rejected_total. - Конфигурация —
envconfig-теги, per-system конфиг,MaxConcurrent. - Где какая защита — какой набор обязателен для outbound, internal, scheduler.
- Health checks — TTL-кеш probe,
/health/ready, per-system индикатор. - Retry —
retry.DoсRetryIf; согласование с bulkhead: не ретраить приInsuranceUnavailableError. - Async и polling — task-queue вместо
time.Sleep-цикла в handler'е. - OpenAPI generator binding —
oapi-codegen, mapper generated → domain; bulkhead на public-методе адаптера, не на сгенерированном клиенте.