Опирается на правила:
R-RES-ISO-1…R-RES-ISO-3иR-RES-ISO-X1…R-RES-ISO-X2из Resilience Style Guide → раздел 2. Per-system isolation.
Важно знать
- На каждую внешнюю систему — отдельный undici
Agent(илиaxios.create(...)) с собственнымиconnections,connectTimeout,headersTimeout,bodyTimeout.- Cockatiel-policy (
wrap(retry, circuitBreaker, bulkhead, timeout)) тоже per-system и живёт как singleton в DI — не пересоздаётся на каждый вызов.- DI-токен и имя экземпляров совпадают:
SBER_CLIENT,sber— одно имя связывает клиент, policy-набор и health-индикатор.- Pool sizing:
connections ≈ maxConcurrent × 1.2; суммарно для всех систем ≤ половина пула БД.- Изоляция нужна затем, что зависание одной системы не должно блокировать другие: shared
Agent— главный антипаттерн.- axios по умолчанию использует
timeout: 0(бесконечность) и одинhttpAgent. Это всегда настраивается явно.- Node.js однопоточен, поэтому bulkhead реализован счётчиком cockatiel
bulkhead(N)— semaphore-семантика без отдельных тредов,AsyncLocalStorage(trace/MDC) не теряется.
Если один Agent обслуживает вызовы к Sber и к OdnaKassa, то зависание Sber занимает все connections в пуле — OdnaKassa не получит слот. Cascading failure: одна система останавливает остальных. Изоляция по системам — первое, что настраивается, когда сервис имеет больше одной внешней зависимости.
Отдельный Agent на каждую систему
R-RES-ISO-1: для каждой внешней системы — отдельный провайдер с собственным Agent (undici).
// adapters/out/sber/sber-client.provider.ts
import { Agent } from 'undici';
import { SBER_CLIENT } from './sber.tokens';
export const SberClientProvider = {
provide: SBER_CLIENT,
inject: [SberClientConfig],
useFactory: (cfg: SberClientConfig): Agent =>
new Agent({
connections: cfg.maxConcurrent, // pool size ≈ maxConcurrent (sizing ниже)
connectTimeout: cfg.connectTimeoutMs, // ms
headersTimeout: cfg.headersTimeoutMs,
bodyTimeout: cfg.bodyTimeoutMs,
}),
};
// adapters/out/receipt/receipt-client.provider.ts
export const ReceiptClientProvider = {
provide: RECEIPT_CLIENT,
inject: [ReceiptClientConfig],
useFactory: (cfg: ReceiptClientConfig): Agent =>
new Agent({
connections: cfg.maxConcurrent,
connectTimeout: cfg.connectTimeoutMs,
headersTimeout: cfg.headersTimeoutMs,
bodyTimeout: cfg.bodyTimeoutMs,
}),
};
Два адаптера — два Agent. Никаких shared-экземпляров между Sber и Receipt.
Для axios — та же идея через axios.create с собственным httpAgent:
import axios from 'axios';
import https from 'https';
export const OrderbookClientProvider = {
provide: ORDERBOOK_CLIENT,
inject: [OrderbookClientConfig],
useFactory: (cfg: OrderbookClientConfig) =>
axios.create({
baseURL: cfg.baseUrl,
timeout: cfg.totalTimeoutMs, // axios timeout = read-уровень; R-RES-TO-1
httpsAgent: new https.Agent({ maxSockets: cfg.maxConcurrent, keepAlive: true }),
}),
};
Sizing connection pool
R-RES-ISO-2: формула связывает внешние клиенты и пул БД.
- Per-system:
connections = Math.ceil(maxConcurrent * 1.2). Запас 20% покрывает keep-alive idle-соединения, которые удерживаются в пуле, пока bulkhead их уже не считает. - Total:
sum(all systems connections) ≤ pgPool.max / 2. Внешние клиенты не должны съедать файловые дескрипторы, необходимые для соединений с PostgreSQL.
Пример для сервиса с тремя внешними системами:
# config/default.yml
pg:
pool:
max: 40
client:
sber:
maxConcurrent: 16 # connections ≈ 20
receipt:
maxConcurrent: 8 # connections ≈ 10
insurance:
maxConcurrent: 8 # connections ≈ 10
# total ≈ 40 ≤ 40/2 = 20 — нарушение, уменьшаем maxConcurrent или увеличиваем pg.pool.max
Если суммарные connections превышают pgPool.max / 2 — либо снижается maxConcurrent по наименее загруженным системам, либо увеличивается pgPool.max.
Cockatiel-policy как singleton в DI
Cockatiel-policy создаётся один раз на систему и регистрируется в DI как singleton. Пересоздание policy на каждый вызов обнуляет накопленную статистику CB — скользящее окно теряет смысл.
// adapters/out/sber/sber-policy.provider.ts
import {
wrap, retry, circuitBreaker, bulkhead, timeout,
ExponentialBackoff, CountBreaker, handleType, TimeoutStrategy,
} from 'cockatiel';
import { SBER_POLICY } from './sber.tokens';
export const SberPolicyProvider = {
provide: SBER_POLICY,
inject: [SberClientConfig],
useFactory: (cfg: SberClientConfig) =>
wrap(
retry(handleType(SberTransientError), {
maxAttempts: cfg.retryMaxAttempts, // 3; только для read-методов (R-RES-RE-1)
backoff: new ExponentialBackoff(),
}),
circuitBreaker(handleAll, {
halfOpenAfter: cfg.cbHalfOpenAfterMs, // 30_000
breaker: new CountBreaker({ threshold: cfg.cbFailureThreshold, size: cfg.cbWindowSize }),
}),
bulkhead(cfg.maxConcurrent, cfg.bulkheadQueueLimit ?? 0),
timeout(cfg.totalTimeoutMs, TimeoutStrategy.Aggressive),
),
};
// adapters/out/sber/sber.adapter.ts
import { Inject, Injectable } from '@nestjs/common';
import { SBER_CLIENT, SBER_POLICY } from './sber.tokens';
import { BrokenCircuitError, BulkheadRejectedError, TaskCancelledError } from 'cockatiel';
import type { Agent } from 'undici';
import type { IPolicy } from 'cockatiel';
import { toSberDomain, toRegisterRequest } from './sber.mapper';
@Injectable()
export class SberAdapter implements PaymentPort {
constructor(
@Inject(SBER_CLIENT) private readonly client: Agent,
@Inject(SBER_POLICY) private readonly policy: IPolicy,
) {}
async findPayment(ref: PaymentRef): Promise<Payment> { // read → retry допустим (R-RES-RE-1)
try {
const resp = await this.policy.execute(() =>
this.client.request({ origin: this.baseUrl, path: `/payments/${ref.id}`, method: 'GET' }),
);
return toSberDomain(await resp.body.json() as SberPaymentDto);
} catch (e) {
if (e instanceof BrokenCircuitError) throw PaymentPortError.systemUnavailable('sber', e);
if (e instanceof BulkheadRejectedError) throw PaymentPortError.overloaded('sber', e);
if (e instanceof TaskCancelledError) throw PaymentPortError.timeout('sber', e);
throw PaymentPortError.from(e);
}
}
async registerPayment(cmd: RegisterPaymentCommand): Promise<PaymentRef> { // write + Idempotency-Key
try {
const resp = await this.policy.execute(() =>
this.client.request({
origin: this.baseUrl,
path: '/payments',
method: 'POST',
headers: { 'Idempotency-Key': cmd.idempotencyKey },
body: JSON.stringify(toRegisterRequest(cmd)),
}),
);
return toSberDomain(await resp.body.json() as SberRegisterDto);
} catch (e) {
if (e instanceof BrokenCircuitError) throw PaymentPortError.systemUnavailable('sber', e);
throw PaymentPortError.from(e);
}
}
}
Retry назначен всей policy, но метод registerPayment использует Idempotency-Key — retry безопасен (R-RES-RE-1). Если write-метод без Idempotency-Key — policy собирается без retry() в обёртке.
Единое имя для токена, policy и health-индикатора
R-RES-ISO-3: одно имя — sber, receipt, insurance — связывает DI-токен клиента, DI-токен policy и health-индикатор.
// adapters/out/sber/sber.tokens.ts
export const SBER_CLIENT = Symbol('SBER_CLIENT');
export const SBER_POLICY = Symbol('SBER_POLICY');
// adapters/out/sber/sber-health.indicator.ts
@Injectable()
export class SberHealthIndicator extends HealthIndicator {
private cached?: { up: boolean; at: number };
constructor(@Inject(SBER_CLIENT) private readonly client: Agent) {
super();
}
async isHealthy(): Promise<HealthIndicatorResult> {
if (!this.cached || Date.now() - this.cached.at > 30_000)
this.cached = { up: await this.probe(), at: Date.now() }; // TTL 30s (R-RES-HC-2)
return this.getStatus('sber', this.cached.up); // ключ = имя системы
}
private async probe(): Promise<boolean> {
try {
const resp = await this.client.request({ origin: this.baseUrl, path: '/health', method: 'GET' });
await resp.body.dump();
return resp.statusCode < 500;
} catch {
return false;
}
}
}
Имя 'sber' в getStatus совпадает с именем токена и конфигурационного ключа — в метриках, логах и алёртах одна строка на «единицу изоляции».
Пример для Customer-домена с несколькими системами
// Три системы, три независимых провайдера
@Module({
providers: [
SberClientProvider,
SberPolicyProvider,
SberAdapter,
SberHealthIndicator,
InsuranceClientProvider,
InsurancePolicyProvider,
InsuranceAdapter,
InsuranceHealthIndicator,
ReceiptClientProvider,
ReceiptPolicyProvider,
ReceiptAdapter,
ReceiptHealthIndicator,
],
exports: [SberAdapter, InsuranceAdapter, ReceiptAdapter],
})
export class CustomerAdaptersModule {}
Каждая система — четыре провайдера: клиент, policy, адаптер, health. Один модуль регистрирует их всех.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Один shared Agent или axios-экземпляр для нескольких систем | R-RES-ISO-X1 | Отдельный Agent/axios.create на каждую систему |
new Agent() без явных connections, connectTimeout, headersTimeout, bodyTimeout | R-RES-ISO-X2 | Явные параметры из типизированного конфига |
axios.create({ timeout: 0 }) или без timeout | R-RES-ISO-X2 | timeout: cfg.totalTimeoutMs (ненулевой) |
Разные имена DI-токена и health-индикатора для одной системы (SBER_CLIENT vs 'payment') | R-RES-ISO-3 | Единое имя: токен, policy, health — все sber |
Total connections всех систем > pgPool.max / 2 | R-RES-ISO-2 | sum(connections) ≤ pgPool.max / 2 |
| Cockatiel-policy пересоздаётся на каждый вызов (не singleton в DI) | R-RES-ISO-1 | Policy — singleton, регистрируется в DI один раз |
Куда дальше
- Таймауты — иерархия
connectTimeout/headersTimeout/bodyTimeout+ cockatieltimeout(). - Circuit Breaker —
CountBreaker, параметры скользящего окна, обработкаBrokenCircuitError. - Bulkhead —
bulkhead(maxConcurrent, queueLimit), sizing,BulkheadRejectedError. - Retry —
ExponentialBackoff, границы in-memory vs task-queue, идемпотентность. - Конфигурация — декларативный конфиг, zod/class-validator, per-system override.
- OpenAPI generator binding — где размещать policy-обёртки при generated client.
- Health checks —
@nestjs/terminuscustom indicator с TTL-кешем. - Observability —
prom-client, CB-события, структурный лог на переходы. - Fallback — кеш/частичный ответ, запреты для money-операций.
- Async и polling — task-queue вместо
sleep-цикла в handler. - Где ставить защиту — какие слои покрывать, какие нет.