← назад к разделу

Задача, которая есть в каждом втором сервисе: в таблице копятся записи (платежи, уведомления, экспорты), их нужно фоном обработать — проверить во внешней системе и перевести в финальный статус. Первое решение пишется за полчаса: @Scheduled-метод, выборка пачки, цикл. Оно даже работает — на демо, в один инстанс, пока внешняя система отвечает.

Эта статья — глубокий разбор задачи: сначала типичный первый подход и семь способов, которыми он сломается, затем три варианта взрослого решения и критерии выбора между ними.

Первый подход

Код, который встречается в проде чаще, чем хотелось бы (имена изменены):

@Component
@RequiredArgsConstructor
public class PaymentBatchJob {

    private final PaymentRepository paymentRepository;
    private final PaymentService paymentService;

    @Scheduled(fixedDelayString = "${jobs.payments.delay-ms:300000}")
    public void run() {
        List<Payment> batch = paymentRepository.findTop100ByStatusOrderByCreatedAtAsc(PaymentStatus.NEW);
        for (Payment payment : batch) {
            try {
                paymentService.process(payment.getId());
            } catch (Exception ex) {
                log.warn("Failed to process payment {}", payment.getId(), ex);
            }
        }
    }
}

@Service
@RequiredArgsConstructor
public class PaymentService {

    private final PaymentRepository paymentRepository;
    private final RestTemplate restTemplate;

    public void process(UUID paymentId) {
        Payment payment = paymentRepository.findById(paymentId).orElseThrow();

        payment.setStatus(PaymentStatus.PROCESSING);
        paymentRepository.save(payment);

        LimitResponse response = restTemplate.postForObject(
            "http://limits/api/v1/limits/check",
            new LimitRequest(payment.getClientId(), payment.getAmount()),
            LimitResponse.class);

        if (response == null || !response.isAllowed()) {
            payment.setStatus(PaymentStatus.REJECTED);
            paymentRepository.save(payment);
            return;
        }
        markSent(payment);
    }

    @Transactional
    private void markSent(Payment payment) {
        payment.setStatus(PaymentStatus.SENT);
        payment.setSentAt(OffsetDateTime.now());
        paymentRepository.save(payment);
    }
}

Что здесь сломано

1. Голова очереди заклинивает навсегда. Платёж, упавший с исключением, остаётся в NEW. Выборка — findTop100...OrderByCreatedAtAsc: в следующий запуск джоба возьмёт те же 100 строк. Сотня платежей с битыми данными — и очередь встала: новые платежи никогда не доберутся до обработки, а битые будут молотиться вечно. Счётчика попыток нет, отложенного повтора нет, терминального статуса «не смогли» нет.

2. Зависшие PROCESSING невидимы. Сервис упал (deploy, OOM) между save(PROCESSING) и исходом — платёж остался в PROCESSING. Полер выбирает только NEW: зависший платёж не возьмёт уже никто и никогда. После каждого рестарта под нагрузкой таких копится несколько — и это инциденты, которые находят через неделю по жалобе клиента.

3. Второй инстанс — двойная обработка. Запустили две реплики сервиса — оба полера читают одни и те же 100 NEW-строк и обрабатывают их параллельно. Проверка лимитов уйдёт дважды, отправка — дважды. Для платежей это не «неприятно», это деньги. Ни блокировки выборки, ни распределённого лока, ни идемпотентности.

4. @Transactional молча не работает. markSentprivate и вызывается из того же класса: Spring-прокси не перехватывает ни private-методы, ни self-invocation (почему). Аннотация — декорация. А там, где транзакция действительно нужна — взятие платежа в работу, — её нет вовсе: каждый save коммитится сам по себе.

5. Один зависший HTTP-вызов останавливает всё. RestTemplate без настройки таймаутов ждёт ответ бесконечно. Обработка последовательная: сервис лимитов «задумался» на одном платеже — встали оставшиеся 99 и все следующие запуски (fixedDelay ждёт завершения предыдущего). Ни таймаута, ни retry, ни circuit breaker (что здесь должно быть).

6. Потолок производительности вшит в конфиг. 100 платежей раз в 5 минут — 28 800 в сутки максимум, при средней латентности обработки в две с половиной минуты. Рост объёма упрётся в этот потолок, и «подкрутить лимиты» не поможет — см. пункты 1–5.

7. Модель данных подыгрывает багам. double amount для денег, статус-строка, @Data на JPA-entity (ломает equals/hashCode на прокси). Это отдельный разговор — PG Types и Lombok-правила — но в платёжном коде такие мелочи стреляют первыми.

Итог: код не «плохо написан» — он решает другую задачу. Он написан как «перебрать строки», а задача — конкурентная очередь работ с гарантиями. Дальше — три способа решить её честно.

Вариант 1: очередь задач на PostgreSQL

Дефолтный ответ для большинства сервисов: данные уже в PG, транзакции уже есть — нужна правильная механика поверх. Три составляющие: статусная машина, конкурентный claim, ретраи.

Статусная машина с lease

ALTER TABLE payment
    ADD COLUMN attempts        int         NOT NULL DEFAULT 0,
    ADD COLUMN next_attempt_at timestamptz NOT NULL DEFAULT now(),
    ADD COLUMN claimed_until   timestamptz;

Статусы: NEW → PROCESSING → SENT | REJECTED | FAILED. Ключевая идея — PROCESSING не вечен: взятие в работу выдаёт lease (claimed_until = now() + интервал). Просроченный lease означает «воркер умер, задачу можно брать снова» — зависшие платежи из пункта 2 восстанавливаются сами, без ручного вмешательства.

Конкурентный claim: FOR UPDATE SKIP LOCKED

@Transactional
public List<UUID> claimBatch(int limit) {
    return jdbcClient.sql("""
            UPDATE payment SET
                status = 'PROCESSING',
                attempts = attempts + 1,
                claimed_until = now() + interval '5 minutes'
            WHERE id IN (
                SELECT id FROM payment
                WHERE (status = 'NEW' AND next_attempt_at <= now())
                   OR (status = 'PROCESSING' AND claimed_until < now())
                ORDER BY created_at
                LIMIT :limit
                FOR UPDATE SKIP LOCKED
            )
            RETURNING id
            """)
        .param("limit", limit)
        .query(UUID.class)
        .list();
}

FOR UPDATE SKIP LOCKED — сердце паттерна: конкурирующие инстансы не ждут чужих блокировок, а пропускают занятые строки и берут следующие. Два, пять, десять реплик честно делят очередь без распределённых локов и ShedLock — пункт 3 закрыт самой базой. Claim — отдельная короткая транзакция: взяли пачку, проставили lease, закоммитили. Никакого HTTP внутри.

Обработка: вне транзакции, с изоляцией сбоев

@Scheduled(fixedDelayString = "${jobs.payments.delay-ms:5000}")
public void run() {
    List<UUID> claimed = claimService.claimBatch(100);
    claimed.forEach(id -> executor.submit(() -> worker.processOne(id)));
}

public void processOne(UUID paymentId) {
    try {
        LimitDecision decision = limitsClient.check(paymentId);
        finalizer.finish(paymentId, decision);
    } catch (Exception ex) {
        finalizer.scheduleRetry(paymentId, ex);
    }
}

Решения, зашитые здесь:

  • HTTP — между транзакциями, не внутри. Claim закоммичен до вызова, исход пишется отдельной транзакцией после. Соединение с БД не висит, пока внешний сервис думает.
  • limitsClient — не голый RestTemplate, а адаптер с таймаутами, retry на идемпотентной операции и circuit breaker (resilience-паттерны). Идемпотентность внешнего вызова — Idempotency-Key: paymentId: повтор после сбоя не спишет лимит дважды.
  • Параллелизм с ограничением. executor — пул фиксированного размера или семафор поверх виртуальных потоков: конкуренцию к сервису лимитов ограничиваем сознательно (bulkhead), а не «сколько платежей — столько потоков». Один медленный вызов больше не останавливает остальные 99.
  • fixedDelay — секунды, не минуты. Частый дешёвый опрос пустой очереди стоит один индексный SELECT; латентность падает с минут до секунд без роста нагрузки.

Ретраи с backoff и терминальный отказ

@Transactional
public void scheduleRetry(UUID paymentId, Exception ex) {
    var payment = load(paymentId);
    if (payment.attempts() >= MAX_ATTEMPTS) {
        payment.markFailed(ex.getMessage());
        meterRegistry.counter("payments.failed").increment();
    } else {
        payment.scheduleNextAttempt(backoff(payment.attempts()));
    }
}

Упавший платёж не возвращается в голову очереди — он уходит в хвост с экспоненциальной задержкой (next_attempt_at), а после N попыток получает терминальный FAILED с причиной и алертом. Пункт 1 закрыт: битые платежи не блокируют очередь и не молотятся вечно; на FAILED смотрит человек — это DLQ-семантика без брокера.

Этот вариант закрывает все семь пунктов и масштабируется до тысяч задач в минуту — потолок очереди на PG наступает сильно позже, чем принято думать (SKIP LOCKED и runtime-аспекты PG).

Вариант 2: Spring Batch

Spring Batch — это не «батч-джобы вообще», а фреймворк для конечных пакетных работ: chunk-ориентированный конвейер reader → processor → writer с JobRepository, в котором живёт состояние выполнения — и за счёт него restart с места падения, skip-policy и retry-policy на уровне chunk-ов, учёт прочитанного/записанного.

Когда он уместен: работа имеет начало и конец. Выгрузить реестр платежей за месяц в файл для банка, мигрировать десять миллионов строк, перетарифицировать всех клиентов раз в квартал. «Упало на 7-м миллионе — перезапустили, продолжило с 7-го» — ради этого Spring Batch и существует, и руками такое писать дорого.

Почему он не решает нашу задачу: поток платежей не имеет конца. Это не «джоб», а постоянная очередь; restart-семантика и JobRepository ничего не добавляют, а вопросы конкурентного claim, lease и идемпотентности остаются ровно теми же — Spring Batch их не решает, он решает другую проблему. Натянуть его на continuous-обработку можно (бесконечно перезапускаемый джоб) — получится вариант 1 с лишней инфраструктурой.

Практическое правило: конечный пакет → Spring Batch; бесконечный поток → очередь задач (вариант 1) или брокер (вариант 3).

Вариант 3: брокер сообщений

Платёж как сообщение: создание платежа публикует событие через outbox, воркеры — потребители очереди/топика с конкуренцией, ретраями и DLQ из коробки (RabbitMQ — нативно, Kafka — retry-топиками, разбор).

Что это даёт по сравнению с PG-очередью: независимое масштабирование воркеров (consumer group растёт без давления на прод-БД), готовая DLQ-механика, fan-out — если платёж интересен ещё и аналитике/нотификациям, событие уже есть. Чего не даёт: идемпотентность обработки всё равно ваша (at-least-once никуда не девается), плюс появляется брокер с его эксплуатацией и eventual consistency между БД и очередью.

Когда переходить: объёмы, при которых polling-нагрузка и конкуренция claim-ов на PG становятся заметными (десятки тысяч задач в минуту); больше одного типа потребителей; воркеры — отдельный сервис от продьюсера. До этого брокер для фоновой обработки — бритва Оккама: сущность сверх необходимого.

Критерии выбора

PG-очередь (вариант 1)Spring BatchБрокер
Природа работыБесконечный поток задачКонечный пакет с restartБесконечный поток, fan-out
ОбъёмДо тысяч задач/минМиллионы строк за прогонДесятки тысяч/мин и выше
Транзакционность claimРодная (SKIP LOCKED)Не про этоЧерез outbox + идемпотентность
Новая инфраструктураНетJobRepository-таблицыБрокер + мониторинг
DLQ / ретраиСвои (attempts + backoff)Skip/retry policyИз коробки
Когда братьДефолтВыгрузки, миграции, отчётыМасштаб, несколько потребителей

Антипаттерны исходного кода — сводно

АнтипаттернЧем кончаетсяЧто взамен
Выборка без блокировки в N инстансовДвойная обработка денегFOR UPDATE SKIP LOCKED в transactional claim
Упавшие задачи остаются в NEW в голове очередиОчередь заклинивает навсегдаattempts + next_attempt_at + терминальный FAILED
PROCESSING без leaseЗависшие задачи после рестартаclaimed_until + повторный claim просроченных
HTTP внутри обработки без таймаутовОдин вызов вешает весь батчТаймауты, CB, bulkhead; вызов между транзакциями
@Transactional на private / self-invocationТранзакции молча нетПубличный метод другого бина (механика)
Последовательный цикл по пачкеЛатентность и потолок throughputОграниченный параллелизм (пул / семафор + VT)
Редкий опрос большими пачкамиМинуты латентностиЧастый дешёвый опрос малыми пачками

Что почитать дальше

  • @Transactional глубоко — почему аннотация на private не работает и где границы транзакций.
  • Паттерны отказоустойчивости — таймауты, retry, circuit breaker, bulkhead для вызова лимитов.
  • Распределённые паттерны — outbox и идемпотентность для варианта с брокером.
  • Sync vs async — когда вообще уводить обработку в фон и события.
  • Scheduled, Async, виртуальные потоки — механика шедулера и пулов, на которой всё это стоит.