Опирается на правила:
R-RES-ASYNC-1…R-RES-ASYNC-3иR-RES-ASYNC-X1…R-RES-ASYNC-X2из Resilience Style Guide → раздел 11. Async и polling.
Важно знать
- Polling внешней системы — через task-queue в БД, не
Thread.sleepв синхронном handler.- Команда создаёт
<X>PollingTask(status=IN_PROGRESS,next_attempt_at=now()+5s), возвращает клиенту 202 Accepted.- Scheduler (
@Scheduledкаждые 5s,FOR UPDATE SKIP LOCKED) дёргает внешнюю систему, обновляет статус.- При успехе —
status=COMPLETED, продолжение бизнес-флоу через event/saga.- В sync-методе адаптера
Thread.sleepдопустим только если total wait<2s(короткий transient retry).- Для async outbound (
CompletableFuture-возврат) —@TimeLimiter(name = "<system>")обязателен.Thread.sleepв цикле в sync-handler — главный антипаттерн: блокирует worker-thread на N × iterations.Thread.sleep > 5s— запах «должно было быть task-queue».
Когда внешняя система не отвечает мгновенно — например, асинхронно подтверждает платёж через webhook или требует polling статуса — нет соблазна большего, чем спросить «а просто подождать 30 секунд и ещё раз спросить». В sync-handler это убивает thread-pool сервиса. Task-queue — стандартный способ обойти это правильно. Раскрытие раздела 11 гайда.
Polling через task-queue
R-RES-ASYNC-1: классическая структура для polling-сценария.
1. Schema task-таблицы
CREATE TABLE order_confirmation_task (
task_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
order_id BIGINT NOT NULL,
external_id TEXT, -- id из внешней системы
status TEXT NOT NULL, -- PENDING / IN_PROGRESS / COMPLETED / FAILED
retry_count INTEGER NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMPTZ NOT NULL,
last_error TEXT,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX ix_oct_due ON order_confirmation_task (status, next_attempt_at)
WHERE status IN ('PENDING', 'IN_PROGRESS');
WHERE в partial-index'е — оптимизация: COMPLETED и FAILED не мешают сканированию due-rows.
2. Command handler ставит задачу
@Component
@RequiredArgsConstructor
class CreateOrderHandler implements UseCaseHandler<CreateOrderCommand, CreateOrderResult> {
private final OrderRepository orderRepository;
private final OrderConfirmationTaskRepository taskRepository;
@Override
@Transactional
public CreateOrderResult handle(CreateOrderCommand cmd) {
Order order = orderFactory.createFor(cmd.customerId(), cmd.items());
orderRepository.save(order);
Long taskId = taskRepository.enqueue(
new OrderConfirmationTask(
order.id().value(),
Instant.now(), // next_attempt_at = сейчас
Json.of(cmd)
)
);
return CreateOrderResult.queued(order.id(), taskId); // → 202 Accepted
}
}
Контроллер маппит CreateOrderResult.queued в 202 Accepted с Location: /orders/{id} и body { "status": "queued", "order_id": ..., "task_id": ... }. См. REST API → 202 Accepted.
3. Scheduler опрашивает due-tasks
@Component
@RequiredArgsConstructor
public class OrderConfirmationScheduler {
private final OrderConfirmationTaskRepository taskRepository;
private final PaymentPort paymentPort;
private final ApplicationEventPublisher events;
@Scheduled(fixedDelay = 5_000)
public void runDue() {
List<OrderConfirmationTask> due = taskRepository.findDueForProcessing(50);
for (OrderConfirmationTask t : due) {
try {
OrderStatus status = paymentPort.getStatus(t.orderId());
if (status == OrderStatus.CONFIRMED) {
taskRepository.markCompleted(t.taskId());
events.publishEvent(new OrderConfirmedEvent(t.orderId()));
} else {
scheduleRetry(t, "still pending");
}
} catch (Exception e) {
scheduleRetry(t, e.getMessage());
}
}
}
private void scheduleRetry(OrderConfirmationTask t, String reason) {
int next = t.retryCount() + 1;
if (next >= 20) {
taskRepository.markFailed(t.taskId(), reason);
log.error("task {} failed after {} attempts: {}", t.taskId(), next, reason);
return;
}
Duration backoff = Duration.ofSeconds((long) Math.min(Math.pow(2, next), 300));
taskRepository.scheduleRetry(t.taskId(), next, Instant.now().plus(backoff), reason);
}
}
Что важно:
findDueForProcessing(50)используетFOR UPDATE SKIP LOCKED LIMIT 50— конкурирующие pod'ы берут разные задачи.- Exponential backoff с capped maximum (300s = 5 минут). После 20 attempts —
FAILED+ alert. - При успехе — публикуется domain event, бизнес-флоу продолжается асинхронно.
Подробно — PG Runtime → task-queue.
Thread.sleep допустим только до 2 секунд
R-RES-ASYNC-2: в sync-методе адаптера Thread.sleep допустим только если общее ожидание меньше 2 секунд.
// ОК — короткий fixed retry внутри adapter
public OrderStatus getStatusWithShortWait(OrderId orderId) {
OrderStatus status = paymentPort.getStatus(orderId);
if (status == OrderStatus.PENDING) {
try {
Thread.sleep(500); // 500ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
status = paymentPort.getStatus(orderId);
}
return status;
}
Логика: ≤2s — это в пределах обычного HTTP-вызова, упомянутый в Retry порог in-memory transient retry. Если больше — task-queue.
@TimeLimiter для CompletableFuture-возврата
R-RES-ASYNC-3: если адаптер возвращает CompletableFuture<T> (асинхронный outbound), @TimeLimiter(name = "<system>") обязателен.
@CircuitBreaker(name = "sber", fallbackMethod = "registerAsyncFallback")
@TimeLimiter(name = "sber")
public CompletableFuture<RegisterResult> registerAsync(RegisterCommand cmd) {
return sberAsyncClient.register(toApiRequest(cmd))
.thenApply(mapper::toDomain);
}
resilience4j.timelimiter.instances.sber:
timeout-duration: 30s
cancel-running-future: true
Почему отдельная аннотация:
@Retry,@Bulkhead,@CircuitBreakerработают с synchronous возвратом или сCompletionStage, но не отменяют долгий future.@TimeLimiterгарантирует, что future будет отменён по timeout — иначе CompletableFuture может висеть бесконечно (зависит от транспорта).
Для синхронного outbound (return mapper.toDomain(executeCall(...))) @TimeLimiter не нужен — timeouts покрывает OkHttpClient.callTimeout.
Что запрещено
Thread.sleep в цикле в sync-handler
R-RES-ASYNC-X1: классический бойлерплейт «дождёмся, пока обработается».
// ПЛОХО — sleep-loop в sync-handler
@Override
@Transactional
public OrderStatus handle(ConfirmAndPollCommand cmd) {
paymentPort.confirm(cmd.orderId());
for (int i = 0; i < 30; i++) { // 30 итераций × 1 секунда = 30 секунд
OrderStatus status = paymentPort.getStatus(cmd.orderId());
if (status == OrderStatus.CONFIRMED) {
return status;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
throw new TimeoutException("confirmation didn't arrive in 30s");
}
Что не так:
- Worker-thread заблокирован 30 секунд. При 50 одновременных таких запросах — Tomcat thread-pool (200 потоков по умолчанию) исчерпается за 4 пика.
- Upstream-вызов скорее всего уже отвалился по своему timeout (<30s).
- При рестарте сервиса — все «ждущие» handler'ы отмирают, операции теряются.
Корректно: ставим polling-task в БД, возвращаем 202 Accepted, scheduler опрашивает.
Thread.sleep > 5 секунд
R-RES-ASYNC-X2: любой Thread.sleep больше 5 секунд — запах task-queue.
// ПЛОХО — длинный sleep в коде
public OrderStatus pollUntilReady(OrderId id) {
OrderStatus status = paymentPort.getStatus(id);
if (status == OrderStatus.PENDING) {
Thread.sleep(15_000); // ← 15 секунд!
status = paymentPort.getStatus(id);
}
return status;
}
Это — task-queue по форме («ждать и проверить»), но в виде блокирующего sync-кода. Переписать на:
- Поставить polling-task в БД.
- Вернуть 202 Accepted.
- Scheduler через 15 секунд проверит.
При scale нагрузки sync-вариант умрёт первым. Task-queue масштабируется горизонтально (несколько pod'ов с SKIP LOCKED).
Что запрещено — таблица
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Thread.sleep(N) в цикле в sync-handler | R-RES-ASYNC-X1 | Task-queue с polling-scheduler |
Любой Thread.sleep > 5s | R-RES-ASYNC-X2 | Task-queue |
CompletableFuture outbound без @TimeLimiter | R-RES-ASYNC-3 | @TimeLimiter(name = "<system>") |
Polling-task без SKIP LOCKED (или с одним consumer) | R-RES-ASYNC-1 | FOR UPDATE SKIP LOCKED для распараллеливания |
Возврат 200 OK за queued-операцию | R-RES-ASYNC-1 | 202 Accepted |
| Polling без max-retry-count | R-RES-ASYNC-1 | После N attempts → FAILED + alert |
Куда дальше
- Resilience → раздел 11. Async и polling — нормативные
R-RES-ASYNC-*. - Retry — граница in-memory retry vs task-queue.
- Где какая защита — schedulers через task-queue, не R4J.
- Fallback — fallback с 202 Accepted + task-queue.
- PG Runtime Style Guide — реализация task-queue.
- REST API Style Guide — 202 Accepted и async polling.
- Distributed Patterns Style Guide — saga с polling-task'ами.