Опирается на правила: R-RES-ASYNC-1R-RES-ASYNC-3 и R-RES-ASYNC-X1R-RES-ASYNC-X2 из Resilience Rules → раздел 11. Async и polling.

Важно знать

  • Polling внешней системы — через task-queue в БД (*_task + @Interval-poller), не await sleep()-цикл в HTTP-handler.
  • Handler создаёт запись <X>PollingTask (status=IN_PROGRESS, next_attempt_at=now()), возвращает клиенту 202 Accepted.
  • @Interval-poller выбирает due-задачи через FOR UPDATE SKIP LOCKED, опрашивает внешнюю систему, обновляет статус.
  • При успехе — status=COMPLETED, продолжение бизнес-флоу через event/saga.
  • В методе адаптера await sleep() допустим только при total wait <2s (короткий фиксированный backoff).
  • Для async-вызова с Promise<T>cockatiel timeout() или AbortSignal.timeout() обязательны (retry/bulkhead/CB таймаут promise не ограничивают).
  • await sleep()-цикл в async-handler — главный антипаттерн: держит HTTP-запрос открытым и копит pending-промисы. Под нагрузкой кладёт event loop по памяти.
  • await sleep(N) где N > 5000ms — запах «должно быть task-queue».

Когда страховая компания или платёжная система не отвечает мгновенно — модель проста: «отправили запрос, теперь опрашиваем». В async-среде Node соблазн реализовать это через await sleep() в handler'е особенно велик: нет потоков, promise «дешёвый». Но promise висит пока жив HTTP-запрос, а HTTP-запрос держит сокет, event loop — память. Task-queue — это правильный способ. Раскрытие раздела 11 гайда.

Polling через task-queue

R-RES-ASYNC-1: классическая структура для polling-сценария в NestJS.

1. Schema task-таблицы

CREATE TABLE order_confirmation_task (
    task_id          BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    order_id         BIGINT NOT NULL,
    external_id      TEXT,
    status           TEXT NOT NULL,          -- PENDING / IN_PROGRESS / COMPLETED / FAILED
    retry_count      INTEGER NOT NULL DEFAULT 0,
    next_attempt_at  TIMESTAMPTZ NOT NULL,
    last_error       TEXT,
    payload          JSONB NOT NULL,
    created_at       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at       TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX ix_oct_due ON order_confirmation_task (status, next_attempt_at)
    WHERE status IN ('PENDING', 'IN_PROGRESS');

Partial index на активных статусах — COMPLETED и FAILED не участвуют в сканировании due-rows.

2. UseCase-handler ставит задачу

@Injectable()
export class CreateOrderHandler implements UseCaseHandler<CreateOrderCommand, CreateOrderResult> {
  constructor(
    private readonly orders: OrderRepository,
    private readonly tasks: OrderConfirmationTaskRepository,
  ) {}

  async handle(cmd: CreateOrderCommand): Promise<CreateOrderResult> {
    const order = Order.createFor(cmd.customerId, cmd.items);
    await this.orders.save(order);

    const taskId = await this.tasks.enqueue({
      orderId: order.id.value,
      nextAttemptAt: new Date(),
      payload: cmd,
    });

    return CreateOrderResult.queued(order.id, taskId);   // → 202 Accepted
  }
}

Контроллер маппит CreateOrderResult.queued в 202 Accepted с заголовком Location: /orders/{id} и телом { status: 'queued', orderId, taskId }.

3. Poller опрашивает due-tasks

@Injectable()
export class OrderConfirmationPoller {
  constructor(
    private readonly tasks: OrderConfirmationTaskRepository,
    private readonly insurancePort: InsurancePort,
    private readonly events: EventEmitter2,
  ) {}

  @Interval(5_000)
  async runDue(): Promise<void> {
    const due = await this.tasks.findDueForProcessing(50);  // FOR UPDATE SKIP LOCKED LIMIT 50

    for (const task of due) {
      try {
        const status = await this.insurancePort.getConfirmationStatus(task.orderId);
        if (status === 'CONFIRMED') {
          await this.tasks.markCompleted(task.taskId);
          this.events.emit('order.confirmed', { orderId: task.orderId });
        } else {
          await this.scheduleRetry(task, 'still pending');
        }
      } catch (e) {
        await this.scheduleRetry(task, (e as Error).message);
      }
    }
  }

  private async scheduleRetry(task: OrderConfirmationTask, reason: string): Promise<void> {
    const next = task.retryCount + 1;
    if (next >= 20) {
      await this.tasks.markFailed(task.taskId, reason);
      this.logger.error({ taskId: task.taskId, attempts: next, reason }, 'task permanently failed');
      return;
    }
    const backoffMs = Math.min(Math.pow(2, next) * 1_000, 300_000);
    await this.tasks.scheduleRetry(task.taskId, next, new Date(Date.now() + backoffMs), reason);
  }
}

Что важно:

  • findDueForProcessing(50) использует FOR UPDATE SKIP LOCKED LIMIT 50 — несколько pod'ов берут разные задачи без конкуренции.
  • Exponential backoff с capped maximum (300s = 5 минут). После 20 попыток — FAILED + alert.
  • При успехе — domain event, бизнес-флоу продолжается асинхронно.

Подробно — PG Runtime → task-queue.

sleep допустим только до 2 секунд

R-RES-ASYNC-2: в методе адаптера await sleep() допустим только при total wait меньше 2 секунд.

// ОК — короткий fixed retry внутри адаптера
async getStatusWithShortWait(orderId: OrderId): Promise<OrderStatus> {
  let status = await this.insuranceClient.getStatus(orderId.value);
  if (status === 'PENDING') {
    await sleep(500);                        // 500ms total wait
    status = await this.insuranceClient.getStatus(orderId.value);
  }
  return toDomainStatus(status);
}

function sleep(ms: number): Promise<void> {
  return new Promise(resolve => setTimeout(resolve, ms));
}

Граница 2s — это предел in-memory transient retry (см. Retry). Всё что дольше — task-queue.

timeout() для async-вызовов с Promise-возвратом

R-RES-ASYNC-3: retry(), bulkhead(), circuitBreaker() в cockatiel не ограничивают время жизни promise, который они оборачивают. Для этого нужен явный time-limiter.

@Injectable()
export class InsuranceAdapter implements InsurancePort {
  private readonly policy = wrap(
    retry(handleType(InsuranceTransientError), { maxAttempts: 3, backoff: new ExponentialBackoff() }),
    circuitBreaker(handleAll, { halfOpenAfter: 30_000, breaker: new CountBreaker({ threshold: 0.5, size: 50 }) }),
    bulkhead(6),
    timeout(10_000, TimeoutStrategy.Aggressive),   // time-limiter обязателен
  );

  async getConfirmationStatus(orderId: OrderId): Promise<ConfirmationStatus> {
    try {
      const resp = await this.policy.execute(
        ctx => this.client.request({
          path: `/orders/${orderId.value}/status`,
          method: 'GET',
          signal: ctx.signal,
        }),
      );
      return toDomainStatus(await resp.body.json());
    } catch (e) {
      if (e instanceof BrokenCircuitError) throw InsurancePortError.systemUnavailable('insurance', e);
      if (e instanceof TaskCancelledError) throw InsurancePortError.timeout('insurance', e);
      throw InsurancePortError.from(e);
    }
  }
}

Или через AbortSignal.timeout() напрямую при одиночном вызове без политики:

const resp = await this.client.request({
  path: `/orders/${orderId.value}/status`,
  method: 'GET',
  signal: AbortSignal.timeout(10_000),
});

Почему отдельный time-limiter:

  • retry() ограничивает число попыток, не суммарное время.
  • bulkhead() ограничивает параллелизм, не длительность вызова.
  • circuitBreaker() реагирует на ошибки, не на зависший promise.
  • timeout(ms, TimeoutStrategy.Aggressive) явно прерывает вызов — ctx.signal передаётся в request(), и undici отменяет запрос.

Для синхронного outbound (когда таймауты на уровне клиента: connectTimeout, headersTimeout, bodyTimeout в undici) timeout()-policy опциональна — если на уровне клиента уже задан нужный предел. При наличии дополнительного общего call-timeout — добавляйте.

Что запрещено

АнтипаттернПравилоЧто взамен
await sleep(N)-цикл в HTTP-handler (держит запрос, копит pending-промисы)R-RES-ASYNC-X1Task-queue с @Interval-poller
Любой await sleep(N) где N > 5000R-RES-ASYNC-X2Task-queue
Async-вызов с Promise<T> без timeout() или AbortSignal.timeout()R-RES-ASYNC-3timeout(ms, TimeoutStrategy.Aggressive) в composited policy
Polling-task без FOR UPDATE SKIP LOCKED при нескольких pod'ахR-RES-ASYNC-1SKIP LOCKED для параллельного разбора задач
Возврат 200 OK за поставленную-в-очередь операциюR-RES-ASYNC-1202 Accepted + Location
Polling без max-retry-count (бесконечный цикл)R-RES-ASYNC-1После N попыток → FAILED + alert

Куда дальше

  • Retry — граница in-memory retry vs task-queue; ExponentialBackoff в cockatiel.
  • Где какая защита — schedulers и outbox через task-queue, не through-policy.
  • Fallback — fallback с 202 Accepted + task-queue для деградации.
  • Timeouts — иерархия connect < headers < body < total; когда достаточно undici, когда нужен timeout().
  • Circuit Breaker — BrokenCircuitError → port-исключение → 503/409.
  • Bulkhead — BulkheadRejectedError, sizing относительно connection pool.
  • Configuration — per-system конфиг, zod/class-validator, R-RES-CFG-*.
  • Health Checks — @nestjs/terminus индикатор с TTL-кешем.
  • Observability — CB state-transitions через prom-client, OTel-атрибуты.
  • OpenAPI Generator Binding — policy на адаптере, не на generated client.
  • Per-system Isolation — отдельный Agent/axios-инстанс на систему.