Почти в каждом приложении есть задача: накапливать записи в таблице и обрабатывать их в фоне. Платежи, уведомления, отчёты — что угодно. Первое решение пишется быстро: расписание, выборка пачки, цикл. Оно даже работает — до первого деплоя с двумя репликами или до первой ошибки во внешней системе.
Эта статья объясняет, что именно ломается и как сделать фоновую обработку надёжно.
Типичный первый подход
Вот код, который встречается в большинстве проектов:
@Component
public class PaymentBatchJob {
private final PaymentRepository repo;
private final PaymentService svc;
@Scheduled(fixedDelay = 300_000)
public void run() {
List<Payment> batch = repo.findTop100ByStatusOrderByCreatedAtAsc(PaymentStatus.NEW);
for (Payment payment : batch) {
try {
svc.process(payment.getId());
} catch (Exception ex) {
log.warn("Failed: {}", payment.getId(), ex);
}
}
}
}
Выглядит разумно: раз в 5 минут берём 100 записей со статусом NEW и обрабатываем. Но у этого кода есть несколько серьёзных проблем.
Что здесь ломается
Очередь застревает из-за одной битой записи
Выборка сортируется по дате создания: ORDER BY created_at ASC. Это значит, что при следующем запуске обработчик возьмёт те же самые записи, что и в прошлый раз.
Если один платёж падает с ошибкой каждый раз — он никуда не уходит. Он остаётся в начале очереди и вместе с сотней таких же «застрявших» записей блокирует все новые. Новые платежи накапливаются, но до обработки не доходят.
Счётчика попыток нет. Отложенного повтора нет. Терминального статуса «не вышло» нет.
Зависшие записи после перезапуска
Обработчик берёт запись и сразу ставит ей статус PROCESSING. Потом идёт во внешнюю систему. Если в этот момент сервис упал — обновление статуса в обратно в NEW уже не происходит. Запись остаётся в PROCESSING навсегда.
Обработчик выбирает только NEW, поэтому такую «зависшую» запись он больше не возьмёт. После каждого перезапуска под нагрузкой таких записей накапливается несколько — и никто о них не знает до жалобы от клиента.
Двойная обработка при двух репликах
Запустили два экземпляра сервиса — оба обработчика читают одни и те же 100 строк одновременно. Один платёж отправится дважды. Для уведомлений это неудобство, для платежей — серьёзный инцидент.
Никакой блокировки выборки нет, и несколько параллельных воркеров договориться между собой не могут.
Один зависший вызов останавливает всё
Обработка идёт последовательно: один платёж → запрос во внешнюю систему → следующий платёж. Если внешняя система «задумалась» на одном запросе — весь цикл встал и ждёт. Таймаутов нет. Остальные 99 платежей стоят в очереди.
Как сделать правильно: очередь задач на PostgreSQL
Для большинства случаев отдельный брокер сообщений не нужен — база данных уже есть, транзакции уже работают. Нужна правильная механика поверх.
Шаг 1: добавить нужные поля
ALTER TABLE payment
ADD COLUMN attempts int NOT NULL DEFAULT 0,
ADD COLUMN next_attempt_at timestamptz NOT NULL DEFAULT now(),
ADD COLUMN claimed_until timestamptz;
attempts— сколько раз пытались обработать.next_attempt_at— когда можно попробовать снова (для повторных попыток с задержкой).claimed_until— до какого момента запись «занята» воркером.
Шаг 2: атомарно захватить пачку
Главная идея — захват и обработка должны быть разными транзакциями. Захват — короткая операция, которая атомарно помечает записи как «взяты в работу» прямо в запросе UPDATE:
@Transactional
public List<UUID> claimBatch(int limit) {
return jdbcClient.sql("""
UPDATE payment SET
status = 'PROCESSING',
attempts = attempts + 1,
claimed_until = now() + interval '5 minutes'
WHERE id IN (
SELECT id FROM payment
WHERE (status = 'NEW' AND next_attempt_at <= now())
OR (status = 'PROCESSING' AND claimed_until < now())
ORDER BY created_at
LIMIT :limit
FOR UPDATE SKIP LOCKED
)
RETURNING id
""")
.param("limit", limit)
.query(UUID.class)
.list();
}
Ключевая строка — FOR UPDATE SKIP LOCKED. Это PostgreSQL-конструкция: несколько воркеров одновременно запускают этот запрос, но каждый пропускает уже заблокированные строки и берёт следующие свободные. Двойной обработки не будет — база данных сама распределяет работу между воркерами без какого-либо координатора.
claimed_until решает проблему «зависших» записей: если воркер умер до завершения, через 5 минут условие claimed_until < now() включит эту запись обратно в выборку.
Шаг 3: обработка вне транзакции
Захватили записи, закоммитили — теперь идём во внешнюю систему. HTTP-вызов происходит между транзакциями, а не внутри:
@Scheduled(fixedDelay = 5_000)
public void run() {
List<UUID> claimed = claimBatch(100);
claimed.forEach(id -> executor.submit(() -> processOne(id)));
}
public void processOne(UUID paymentId) {
try {
LimitDecision decision = limitsClient.check(paymentId);
finishSuccessfully(paymentId, decision);
} catch (Exception ex) {
scheduleRetry(paymentId, ex);
}
}
Несколько важных изменений по сравнению с первым подходом:
- Записи обрабатываются параллельно через пул потоков, а не последовательно. Один медленный вызов не блокирует остальные.
- Расписание теперь каждые 5 секунд, а не раз в 5 минут — пустой опрос стоит дёшево (один быстрый запрос), а задержка обработки падает с минут до секунд.
- HTTP-клиент
limitsClientдолжен иметь таймауты. Без явного таймаута один зависший вызов займёт поток из пула навсегда.
Шаг 4: повторные попытки с задержкой и терминальный отказ
Когда обработка падает — запись не остаётся в начале очереди. Она уходит в конец с задержкой:
@Transactional
public void scheduleRetry(UUID paymentId, Exception ex) {
Payment payment = load(paymentId);
if (payment.attempts() >= MAX_ATTEMPTS) {
payment.markFailed(ex.getMessage()); // терминальный статус FAILED
} else {
payment.scheduleNextAttempt(backoff(payment.attempts())); // возврат в NEW с задержкой
}
save(payment);
}
backoff — функция, которая увеличивает паузу с каждой попыткой: например, 1 мин → 2 мин → 4 мин → 8 мин. После N попыток запись получает статус FAILED с причиной ошибки — это сигнал человеку разобраться вручную. Очередь не застревает.
Когда нужен фреймворк пакетной обработки
Spring Batch, Quartz, Celery с beat — это инструменты для конечных задач: выгрузить реестр платежей за месяц в файл, мигрировать десять миллионов строк, перетарифицировать всех клиентов раз в квартал.
Их ключевая возможность — перезапуск с места сбоя: «упало на 7-м миллионе — перезапустили, продолжило с 7-го». Для этого фреймворк хранит состояние прогона и поддерживает политики пропуска и повтора на уровне порций данных.
Для нашей задачи (непрерывная очередь входящих платежей) такой фреймворк избыточен: поток не имеет конца, перезапуск с места не нужен, а вопросы параллельного захвата и повторных попыток остаются теми же — фреймворк их не решает.
Простое правило: конечный пакет с перезапуском → фреймворк; бесконечный поток задач → очередь задач на PostgreSQL.
Когда добавлять брокер сообщений
Брокер (RabbitMQ, Kafka) решает те же задачи, что и очередь на PostgreSQL, но добавляет новые возможности: несколько независимых типов потребителей одного события, масштабирование воркеров без нагрузки на основную базу данных, встроенные очереди недоставленных сообщений.
Переходить к брокеру имеет смысл, когда объёмы вырастают до десятков тысяч задач в минуту или когда одно событие должны обрабатывать несколько разных сервисов. До этого — брокер лишняя инфраструктура с эксплуатационными расходами.
Коротко
- Простой
@Scheduled-цикл ломается при параллельных репликах, зависших записях и последовательных HTTP-вызовах без таймаутов. - Правильный захват задач — атомарный UPDATE с
FOR UPDATE SKIP LOCKED: база сама распределяет работу между воркерами. claimed_until(«аренда задачи») защищает от зависших записей после перезапуска — просроченную аренду воркеры подберут автоматически.- HTTP-вызовы — между транзакциями, не внутри; обработка — параллельная через пул, не последовательная.
- Повторные попытки с нарастающей задержкой и терминальный статус
FAILEDне дают очереди застрять. - Фреймворк пакетной обработки — для конечных задач с перезапуском; брокер — для масштаба и нескольких потребителей.
Что почитать дальше
- Паттерны отказоустойчивости — таймауты, повторные попытки, автоматический выключатель для вызова внешних систем.
- Распределённые паттерны — transactional outbox и идемпотентность при работе с брокером.
@Transactionalв Spring — почему границы транзакций важны и где декларативное управление не срабатывает.