Опирается на правила:
R-RES-ASYNC-1…R-RES-ASYNC-3иR-RES-ASYNC-X1…R-RES-ASYNC-X2из Resilience Rules → раздел 11. Async и polling.
Важно знать
- Polling внешней системы — через task-queue в БД (
*_task+@Interval-poller), неawait sleep()-цикл в HTTP-handler.- Handler создаёт запись
<X>PollingTask(status=IN_PROGRESS,next_attempt_at=now()), возвращает клиенту 202 Accepted.@Interval-poller выбирает due-задачи черезFOR UPDATE SKIP LOCKED, опрашивает внешнюю систему, обновляет статус.- При успехе —
status=COMPLETED, продолжение бизнес-флоу через event/saga.- В методе адаптера
await sleep()допустим только при total wait<2s(короткий фиксированный backoff).- Для async-вызова с
Promise<T>—cockatiel timeout()илиAbortSignal.timeout()обязательны (retry/bulkhead/CB таймаут promise не ограничивают).await sleep()-цикл в async-handler — главный антипаттерн: держит HTTP-запрос открытым и копит pending-промисы. Под нагрузкой кладёт event loop по памяти.await sleep(N)где N > 5000ms — запах «должно быть task-queue».
Когда страховая компания или платёжная система не отвечает мгновенно — модель проста: «отправили запрос, теперь опрашиваем». В async-среде Node соблазн реализовать это через await sleep() в handler'е особенно велик: нет потоков, promise «дешёвый». Но promise висит пока жив HTTP-запрос, а HTTP-запрос держит сокет, event loop — память. Task-queue — это правильный способ. Раскрытие раздела 11 гайда.
Polling через task-queue
R-RES-ASYNC-1: классическая структура для polling-сценария в NestJS.
1. Schema task-таблицы
CREATE TABLE order_confirmation_task (
task_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
order_id BIGINT NOT NULL,
external_id TEXT,
status TEXT NOT NULL, -- PENDING / IN_PROGRESS / COMPLETED / FAILED
retry_count INTEGER NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMPTZ NOT NULL,
last_error TEXT,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX ix_oct_due ON order_confirmation_task (status, next_attempt_at)
WHERE status IN ('PENDING', 'IN_PROGRESS');
Partial index на активных статусах — COMPLETED и FAILED не участвуют в сканировании due-rows.
2. UseCase-handler ставит задачу
@Injectable()
export class CreateOrderHandler implements UseCaseHandler<CreateOrderCommand, CreateOrderResult> {
constructor(
private readonly orders: OrderRepository,
private readonly tasks: OrderConfirmationTaskRepository,
) {}
async handle(cmd: CreateOrderCommand): Promise<CreateOrderResult> {
const order = Order.createFor(cmd.customerId, cmd.items);
await this.orders.save(order);
const taskId = await this.tasks.enqueue({
orderId: order.id.value,
nextAttemptAt: new Date(),
payload: cmd,
});
return CreateOrderResult.queued(order.id, taskId); // → 202 Accepted
}
}
Контроллер маппит CreateOrderResult.queued в 202 Accepted с заголовком Location: /orders/{id} и телом { status: 'queued', orderId, taskId }.
3. Poller опрашивает due-tasks
@Injectable()
export class OrderConfirmationPoller {
constructor(
private readonly tasks: OrderConfirmationTaskRepository,
private readonly insurancePort: InsurancePort,
private readonly events: EventEmitter2,
) {}
@Interval(5_000)
async runDue(): Promise<void> {
const due = await this.tasks.findDueForProcessing(50); // FOR UPDATE SKIP LOCKED LIMIT 50
for (const task of due) {
try {
const status = await this.insurancePort.getConfirmationStatus(task.orderId);
if (status === 'CONFIRMED') {
await this.tasks.markCompleted(task.taskId);
this.events.emit('order.confirmed', { orderId: task.orderId });
} else {
await this.scheduleRetry(task, 'still pending');
}
} catch (e) {
await this.scheduleRetry(task, (e as Error).message);
}
}
}
private async scheduleRetry(task: OrderConfirmationTask, reason: string): Promise<void> {
const next = task.retryCount + 1;
if (next >= 20) {
await this.tasks.markFailed(task.taskId, reason);
this.logger.error({ taskId: task.taskId, attempts: next, reason }, 'task permanently failed');
return;
}
const backoffMs = Math.min(Math.pow(2, next) * 1_000, 300_000);
await this.tasks.scheduleRetry(task.taskId, next, new Date(Date.now() + backoffMs), reason);
}
}
Что важно:
findDueForProcessing(50)используетFOR UPDATE SKIP LOCKED LIMIT 50— несколько pod'ов берут разные задачи без конкуренции.- Exponential backoff с capped maximum (300s = 5 минут). После 20 попыток —
FAILED+ alert. - При успехе — domain event, бизнес-флоу продолжается асинхронно.
Подробно — PG Runtime → task-queue.
sleep допустим только до 2 секунд
R-RES-ASYNC-2: в методе адаптера await sleep() допустим только при total wait меньше 2 секунд.
// ОК — короткий fixed retry внутри адаптера
async getStatusWithShortWait(orderId: OrderId): Promise<OrderStatus> {
let status = await this.insuranceClient.getStatus(orderId.value);
if (status === 'PENDING') {
await sleep(500); // 500ms total wait
status = await this.insuranceClient.getStatus(orderId.value);
}
return toDomainStatus(status);
}
function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
Граница 2s — это предел in-memory transient retry (см. Retry). Всё что дольше — task-queue.
timeout() для async-вызовов с Promise-возвратом
R-RES-ASYNC-3: retry(), bulkhead(), circuitBreaker() в cockatiel не ограничивают время жизни promise, который они оборачивают. Для этого нужен явный time-limiter.
@Injectable()
export class InsuranceAdapter implements InsurancePort {
private readonly policy = wrap(
retry(handleType(InsuranceTransientError), { maxAttempts: 3, backoff: new ExponentialBackoff() }),
circuitBreaker(handleAll, { halfOpenAfter: 30_000, breaker: new CountBreaker({ threshold: 0.5, size: 50 }) }),
bulkhead(6),
timeout(10_000, TimeoutStrategy.Aggressive), // time-limiter обязателен
);
async getConfirmationStatus(orderId: OrderId): Promise<ConfirmationStatus> {
try {
const resp = await this.policy.execute(
ctx => this.client.request({
path: `/orders/${orderId.value}/status`,
method: 'GET',
signal: ctx.signal,
}),
);
return toDomainStatus(await resp.body.json());
} catch (e) {
if (e instanceof BrokenCircuitError) throw InsurancePortError.systemUnavailable('insurance', e);
if (e instanceof TaskCancelledError) throw InsurancePortError.timeout('insurance', e);
throw InsurancePortError.from(e);
}
}
}
Или через AbortSignal.timeout() напрямую при одиночном вызове без политики:
const resp = await this.client.request({
path: `/orders/${orderId.value}/status`,
method: 'GET',
signal: AbortSignal.timeout(10_000),
});
Почему отдельный time-limiter:
retry()ограничивает число попыток, не суммарное время.bulkhead()ограничивает параллелизм, не длительность вызова.circuitBreaker()реагирует на ошибки, не на зависший promise.timeout(ms, TimeoutStrategy.Aggressive)явно прерывает вызов —ctx.signalпередаётся вrequest(), и undici отменяет запрос.
Для синхронного outbound (когда таймауты на уровне клиента: connectTimeout, headersTimeout, bodyTimeout в undici) timeout()-policy опциональна — если на уровне клиента уже задан нужный предел. При наличии дополнительного общего call-timeout — добавляйте.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
await sleep(N)-цикл в HTTP-handler (держит запрос, копит pending-промисы) | R-RES-ASYNC-X1 | Task-queue с @Interval-poller |
Любой await sleep(N) где N > 5000 | R-RES-ASYNC-X2 | Task-queue |
Async-вызов с Promise<T> без timeout() или AbortSignal.timeout() | R-RES-ASYNC-3 | timeout(ms, TimeoutStrategy.Aggressive) в composited policy |
Polling-task без FOR UPDATE SKIP LOCKED при нескольких pod'ах | R-RES-ASYNC-1 | SKIP LOCKED для параллельного разбора задач |
Возврат 200 OK за поставленную-в-очередь операцию | R-RES-ASYNC-1 | 202 Accepted + Location |
| Polling без max-retry-count (бесконечный цикл) | R-RES-ASYNC-1 | После N попыток → FAILED + alert |
Куда дальше
- Retry — граница in-memory retry vs task-queue;
ExponentialBackoffв cockatiel. - Где какая защита — schedulers и outbox через task-queue, не through-policy.
- Fallback — fallback с
202 Accepted+ task-queue для деградации. - Timeouts — иерархия
connect < headers < body < total; когда достаточно undici, когда нуженtimeout(). - Circuit Breaker —
BrokenCircuitError→ port-исключение → 503/409. - Bulkhead —
BulkheadRejectedError, sizing относительно connection pool. - Configuration — per-system конфиг, zod/class-validator,
R-RES-CFG-*. - Health Checks —
@nestjs/terminusиндикатор с TTL-кешем. - Observability — CB state-transitions через
prom-client, OTel-атрибуты. - OpenAPI Generator Binding — policy на адаптере, не на generated client.
- Per-system Isolation — отдельный
Agent/axios-инстанс на систему.