Опирается на правила: R-RES-ASYNC-1R-RES-ASYNC-3 и R-RES-ASYNC-X1R-RES-ASYNC-X2 из Resilience Style Guide → раздел 11. Async и polling.

Важно знать

  • Polling внешней системы — через task-queue в БД, не Thread.sleep в синхронном handler.
  • Команда создаёт <X>PollingTask (status=IN_PROGRESS, next_attempt_at=now()+5s), возвращает клиенту 202 Accepted.
  • Scheduler (@Scheduled каждые 5s, FOR UPDATE SKIP LOCKED) дёргает внешнюю систему, обновляет статус.
  • При успехе — status=COMPLETED, продолжение бизнес-флоу через event/saga.
  • В sync-методе адаптера Thread.sleep допустим только если total wait <2s (короткий transient retry).
  • Для async outbound (CompletableFuture-возврат) — @TimeLimiter(name = "<system>") обязателен.
  • Thread.sleep в цикле в sync-handler — главный антипаттерн: блокирует worker-thread на N × iterations.
  • Thread.sleep > 5s — запах «должно было быть task-queue».

Когда внешняя система не отвечает мгновенно — например, асинхронно подтверждает платёж через webhook или требует polling статуса — нет соблазна большего, чем спросить «а просто подождать 30 секунд и ещё раз спросить». В sync-handler это убивает thread-pool сервиса. Task-queue — стандартный способ обойти это правильно. Раскрытие раздела 11 гайда.

Polling через task-queue

R-RES-ASYNC-1: классическая структура для polling-сценария.

1. Schema task-таблицы

CREATE TABLE order_confirmation_task (
    task_id          BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    order_id         BIGINT NOT NULL,
    external_id      TEXT,                  -- id из внешней системы
    status           TEXT NOT NULL,         -- PENDING / IN_PROGRESS / COMPLETED / FAILED
    retry_count      INTEGER NOT NULL DEFAULT 0,
    next_attempt_at  TIMESTAMPTZ NOT NULL,
    last_error       TEXT,
    payload          JSONB NOT NULL,
    created_at       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at       TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX ix_oct_due ON order_confirmation_task (status, next_attempt_at)
    WHERE status IN ('PENDING', 'IN_PROGRESS');

WHERE в partial-index'е — оптимизация: COMPLETED и FAILED не мешают сканированию due-rows.

2. Command handler ставит задачу

@Component
@RequiredArgsConstructor
class CreateOrderHandler implements UseCaseHandler<CreateOrderCommand, CreateOrderResult> {

    private final OrderRepository orderRepository;
    private final OrderConfirmationTaskRepository taskRepository;

    @Override
    @Transactional
    public CreateOrderResult handle(CreateOrderCommand cmd) {
        Order order = orderFactory.createFor(cmd.customerId(), cmd.items());
        orderRepository.save(order);

        Long taskId = taskRepository.enqueue(
            new OrderConfirmationTask(
                order.id().value(),
                Instant.now(),                          // next_attempt_at = сейчас
                Json.of(cmd)
            )
        );

        return CreateOrderResult.queued(order.id(), taskId);   // → 202 Accepted
    }
}

Контроллер маппит CreateOrderResult.queued в 202 Accepted с Location: /orders/{id} и body { "status": "queued", "order_id": ..., "task_id": ... }. См. REST API → 202 Accepted.

3. Scheduler опрашивает due-tasks

@Component
@RequiredArgsConstructor
public class OrderConfirmationScheduler {

    private final OrderConfirmationTaskRepository taskRepository;
    private final PaymentPort paymentPort;
    private final ApplicationEventPublisher events;

    @Scheduled(fixedDelay = 5_000)
    public void runDue() {
        List<OrderConfirmationTask> due = taskRepository.findDueForProcessing(50);
        for (OrderConfirmationTask t : due) {
            try {
                OrderStatus status = paymentPort.getStatus(t.orderId());
                if (status == OrderStatus.CONFIRMED) {
                    taskRepository.markCompleted(t.taskId());
                    events.publishEvent(new OrderConfirmedEvent(t.orderId()));
                } else {
                    scheduleRetry(t, "still pending");
                }
            } catch (Exception e) {
                scheduleRetry(t, e.getMessage());
            }
        }
    }

    private void scheduleRetry(OrderConfirmationTask t, String reason) {
        int next = t.retryCount() + 1;
        if (next >= 20) {
            taskRepository.markFailed(t.taskId(), reason);
            log.error("task {} failed after {} attempts: {}", t.taskId(), next, reason);
            return;
        }
        Duration backoff = Duration.ofSeconds((long) Math.min(Math.pow(2, next), 300));
        taskRepository.scheduleRetry(t.taskId(), next, Instant.now().plus(backoff), reason);
    }
}

Что важно:

  • findDueForProcessing(50) использует FOR UPDATE SKIP LOCKED LIMIT 50 — конкурирующие pod'ы берут разные задачи.
  • Exponential backoff с capped maximum (300s = 5 минут). После 20 attempts — FAILED + alert.
  • При успехе — публикуется domain event, бизнес-флоу продолжается асинхронно.

Подробно — PG Runtime → task-queue.

Thread.sleep допустим только до 2 секунд

R-RES-ASYNC-2: в sync-методе адаптера Thread.sleep допустим только если общее ожидание меньше 2 секунд.

// ОК — короткий fixed retry внутри adapter
public OrderStatus getStatusWithShortWait(OrderId orderId) {
    OrderStatus status = paymentPort.getStatus(orderId);
    if (status == OrderStatus.PENDING) {
        try {
            Thread.sleep(500);                  // 500ms
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        status = paymentPort.getStatus(orderId);
    }
    return status;
}

Логика: ≤2s — это в пределах обычного HTTP-вызова, упомянутый в Retry порог in-memory transient retry. Если больше — task-queue.

@TimeLimiter для CompletableFuture-возврата

R-RES-ASYNC-3: если адаптер возвращает CompletableFuture<T> (асинхронный outbound), @TimeLimiter(name = "<system>") обязателен.

@CircuitBreaker(name = "sber", fallbackMethod = "registerAsyncFallback")
@TimeLimiter(name = "sber")
public CompletableFuture<RegisterResult> registerAsync(RegisterCommand cmd) {
    return sberAsyncClient.register(toApiRequest(cmd))
        .thenApply(mapper::toDomain);
}
resilience4j.timelimiter.instances.sber:
  timeout-duration: 30s
  cancel-running-future: true

Почему отдельная аннотация:

  • @Retry, @Bulkhead, @CircuitBreaker работают с synchronous возвратом или с CompletionStage, но не отменяют долгий future.
  • @TimeLimiter гарантирует, что future будет отменён по timeout — иначе CompletableFuture может висеть бесконечно (зависит от транспорта).

Для синхронного outbound (return mapper.toDomain(executeCall(...))) @TimeLimiter не нужен — timeouts покрывает OkHttpClient.callTimeout.

Что запрещено

Thread.sleep в цикле в sync-handler

R-RES-ASYNC-X1: классический бойлерплейт «дождёмся, пока обработается».

// ПЛОХО — sleep-loop в sync-handler
@Override
@Transactional
public OrderStatus handle(ConfirmAndPollCommand cmd) {
    paymentPort.confirm(cmd.orderId());
    for (int i = 0; i < 30; i++) {                     // 30 итераций × 1 секунда = 30 секунд
        OrderStatus status = paymentPort.getStatus(cmd.orderId());
        if (status == OrderStatus.CONFIRMED) {
            return status;
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }
    throw new TimeoutException("confirmation didn't arrive in 30s");
}

Что не так:

  • Worker-thread заблокирован 30 секунд. При 50 одновременных таких запросах — Tomcat thread-pool (200 потоков по умолчанию) исчерпается за 4 пика.
  • Upstream-вызов скорее всего уже отвалился по своему timeout (<30s).
  • При рестарте сервиса — все «ждущие» handler'ы отмирают, операции теряются.

Корректно: ставим polling-task в БД, возвращаем 202 Accepted, scheduler опрашивает.

Thread.sleep > 5 секунд

R-RES-ASYNC-X2: любой Thread.sleep больше 5 секунд — запах task-queue.

// ПЛОХО — длинный sleep в коде
public OrderStatus pollUntilReady(OrderId id) {
    OrderStatus status = paymentPort.getStatus(id);
    if (status == OrderStatus.PENDING) {
        Thread.sleep(15_000);                          // ← 15 секунд!
        status = paymentPort.getStatus(id);
    }
    return status;
}

Это — task-queue по форме («ждать и проверить»), но в виде блокирующего sync-кода. Переписать на:

  1. Поставить polling-task в БД.
  2. Вернуть 202 Accepted.
  3. Scheduler через 15 секунд проверит.

При scale нагрузки sync-вариант умрёт первым. Task-queue масштабируется горизонтально (несколько pod'ов с SKIP LOCKED).

Что запрещено — таблица

АнтипаттернПравилоЧто взамен
Thread.sleep(N) в цикле в sync-handlerR-RES-ASYNC-X1Task-queue с polling-scheduler
Любой Thread.sleep > 5sR-RES-ASYNC-X2Task-queue
CompletableFuture outbound без @TimeLimiterR-RES-ASYNC-3@TimeLimiter(name = "<system>")
Polling-task без SKIP LOCKED (или с одним consumer)R-RES-ASYNC-1FOR UPDATE SKIP LOCKED для распараллеливания
Возврат 200 OK за queued-операциюR-RES-ASYNC-1202 Accepted
Polling без max-retry-countR-RES-ASYNC-1После N attempts → FAILED + alert

Куда дальше