Распределённые паттерны

2PC, 3PC, SAGA, Transactional Outbox, CDC, Event Sourcing, Idempotent Consumer, Distributed Lock — согласованность данных в микросервисной архитектуре.

Распределённые паттерны

Все примеры в статье — на сквозном кейсе сайта: высоконагруженный маркетплейс. Когда монолит разбивается на микросервисы, исчезает главное удобство — единая транзакция базы данных. @Transactional больше не спасает, если заказ живёт в одном сервисе, оплата — в другом, а склад — в третьем. Нужны паттерны, которые обеспечивают согласованность данных через границы сервисов.

Проблема: распределённая транзакция

В монолите всё просто:

@Transactional
public void placeOrder(CreateOrderCommand cmd) {
    Order order = orderRepository.save(createOrder(cmd));
    paymentService.charge(order.getUserId(), order.getTotal());
    inventoryService.reserve(order.getItems());
    notificationService.sendConfirmation(order);
}

Один @Transactional — и либо всё произошло, либо ничего. Но в микросервисах orderRepository, paymentService, inventoryService — это разные базы данных, разные процессы, разные сети. Откатить чужую транзакцию нельзя. Сеть может упасть между вторым и третьим вызовом. Сервис может ответить, но ответ не дойдёт.

Two-Phase Commit (2PC)

Идея

Координатор спрашивает всех участников: «Готовы зафиксировать?» Если все ответили «да» — команда «фиксируй». Если хоть один ответил «нет» — команда «откатись».

diagram

Фаза 1 — Prepare (голосование)

Координатор отправляет каждому участнику команду PREPARE. Участник выполняет всю работу (блокирует ресурсы, пишет WAL), но не фиксирует. Отвечает VOTE_YES или VOTE_ABORT.

Фаза 2 — Commit / Rollback (решение)

Если все проголосовали YES — координатор шлёт COMMIT. Если хоть один ABORT — координатор шлёт ROLLBACK.

Пример на Java

public interface TransactionParticipant {
    boolean prepare(TransactionId txId, Object payload);
    void commit(TransactionId txId);
    void rollback(TransactionId txId);
}

@Component
@RequiredArgsConstructor
public class TwoPhaseCommitCoordinator {

    private final List<TransactionParticipant> participants;
    private final TransactionLogRepository txLog;

    public void execute(TransactionId txId, Object payload) {
        txLog.save(txId, TxStatus.STARTED);

        // Фаза 1: Prepare
        boolean allReady = true;
        for (TransactionParticipant p : participants) {
            try {
                if (!p.prepare(txId, payload)) {
                    allReady = false;
                    break;
                }
            } catch (Exception e) {
                allReady = false;
                break;
            }
        }

        // Фаза 2: Commit или Rollback
        if (allReady) {
            txLog.save(txId, TxStatus.COMMITTING);
            for (TransactionParticipant p : participants) {
                p.commit(txId);
            }
            txLog.save(txId, TxStatus.COMMITTED);
        } else {
            txLog.save(txId, TxStatus.ROLLING_BACK);
            for (TransactionParticipant p : participants) {
                p.rollback(txId);
            }
            txLog.save(txId, TxStatus.ROLLED_BACK);
        }
    }
}

Проблемы 2PC

  • Блокировки. Между PREPARE и COMMIT ресурсы заблокированы. Если координатор упал — участники зависли с заблокированными строками.
  • Единая точка отказа. Координатор обязан быть доступен. Его падение между фазами оставляет систему в неопределённом состоянии.
  • Latency. Два сетевых раунда минимум. Каждый участник ждёт остальных.
  • Масштабируемость. Чем больше участников, тем выше вероятность, что кто-то не ответит вовремя.

Когда применять

2PC работает в рамках одной инфраструктуры: несколько баз данных одного вендора, XA-транзакции через JTA. Для микросервисов через сеть — почти никогда.

Three-Phase Commit (3PC)

Зачем третья фаза

2PC страдает от blocking problem: если координатор упал после PREPARE, но до COMMIT — участники не знают, что делать. Ресурсы заблокированы, система зависла.

3PC добавляет промежуточную фазу PRE_COMMIT, которая даёт участникам достаточно информации для самостоятельного решения.

diagram
  • Фаза 1 — Can Commit? Координатор спрашивает участников, могут ли они выполнить транзакцию. Участники проверяют ресурсы, но ещё ничего не блокируют.
  • Фаза 2 — Pre-Commit. Координатор сообщает: все согласны. Участники фиксируют намерение и блокируют ресурсы. Теперь, если координатор пропадёт, участники знают, что все проголосовали YES, и могут самостоятельно зафиксировать по таймауту.
  • Фаза 3 — Do Commit. Финальная команда на фиксацию.

Почему 3PC редко используют

Три сетевых раунда вместо двух — ещё больше latency. И главное: 3PC не решает проблему сетевых разделений (network partition). Если участник получил PRE_COMMIT, а потом потерял связь с координатором и другими участниками — он зафиксирует по таймауту. Но если другой участник в это время откатился — данные рассогласованы.

На практике проблемы 2PC решают не через 3PC, а через SAGA + eventual consistency.

SAGA

Идея

Вместо одной большой транзакции — цепочка локальных транзакций. Каждый шаг фиксируется самостоятельно. Если шаг N упал — выполняются компенсирующие транзакции для шагов N-1, N-2, ..., 1.

Шаг 1: Создать заказ            → Компенсация: Отменить заказ
Шаг 2: Списать деньги           → Компенсация: Вернуть деньги
Шаг 3: Зарезервировать на складе → Компенсация: Снять резерв
Шаг 4: Отправить уведомление     → Компенсация: (не нужна)

Ключевое отличие от 2PC: промежуточные состояния видны другим сервисам. Между шагом 2 (деньги списаны) и шагом 3 (склад зарезервирован) система находится во временно несогласованном состоянии. Это eventual consistency.

Оркестрация

Центральный оркестратор управляет последовательностью шагов. Знает, какой шаг выполнить следующим, и какую компенсацию вызвать при ошибке.

public record SagaStep<T>(
    String name,
    Function<T, StepResult> action,
    Consumer<T> compensation
) {}

@Component
@RequiredArgsConstructor
public class CreateOrderSaga {

    private final OrderService orderService;
    private final PaymentService paymentService;
    private final InventoryService inventoryService;
    private final SagaLogRepository sagaLog;

    public OrderResult execute(CreateOrderRequest request) {
        SagaContext ctx = new SagaContext(request);
        List<SagaStep<SagaContext>> completedSteps = new ArrayList<>();

        List<SagaStep<SagaContext>> steps = List.of(
            new SagaStep<>("create-order",
                c -> orderService.create(c.getRequest()),
                c -> orderService.cancel(c.getOrderId())),

            new SagaStep<>("charge-payment",
                c -> paymentService.charge(c.getUserId(), c.getTotal()),
                c -> paymentService.refund(c.getPaymentId())),

            new SagaStep<>("reserve-inventory",
                c -> inventoryService.reserve(c.getOrderId(), c.getItems()),
                c -> inventoryService.releaseReservation(c.getOrderId()))
        );

        for (SagaStep<SagaContext> step : steps) {
            try {
                sagaLog.save(ctx.getSagaId(), step.name(), StepStatus.STARTED);
                StepResult result = step.action().apply(ctx);
                ctx.apply(result);
                completedSteps.add(step);
                sagaLog.save(ctx.getSagaId(), step.name(), StepStatus.COMPLETED);
            } catch (Exception e) {
                sagaLog.save(ctx.getSagaId(), step.name(), StepStatus.FAILED);
                compensate(ctx, completedSteps);
                throw new SagaException("Step failed: " + step.name(), e);
            }
        }

        return ctx.toResult();
    }

    private void compensate(SagaContext ctx,
                            List<SagaStep<SagaContext>> completedSteps) {
        ListIterator<SagaStep<SagaContext>> it =
            completedSteps.listIterator(completedSteps.size());

        while (it.hasPrevious()) {
            SagaStep<SagaContext> step = it.previous();
            try {
                sagaLog.save(ctx.getSagaId(), step.name(), StepStatus.COMPENSATING);
                step.compensation().accept(ctx);
                sagaLog.save(ctx.getSagaId(), step.name(), StepStatus.COMPENSATED);
            } catch (Exception e) {
                sagaLog.save(ctx.getSagaId(), step.name(), StepStatus.COMPENSATION_FAILED);
                log.error("Compensation failed for step: {}", step.name(), e);
            }
        }
    }
}

Плюсы оркестрации. Логика в одном месте. Легко понять последовательность. Проще дебажить — смотришь лог оркестратора.

Минусы. Оркестратор — единая точка связности. Знает про все сервисы. Может вырасти в God Object.

Хореография

Нет центрального координатора. Каждый сервис слушает события и реагирует, публикуя свои.

diagram
// PaymentService слушает событие и реагирует
@Component
@RequiredArgsConstructor
public class OrderCreatedListener {

    private final PaymentService paymentService;
    private final EventPublisher eventPublisher;

    @KafkaListener(topics = "order-events")
    public void onOrderCreated(OrderCreatedEvent event) {
        try {
            PaymentResult result = paymentService.charge(
                event.userId(), event.total());
            eventPublisher.publish(new PaymentChargedEvent(
                event.orderId(), result.paymentId()));
        } catch (Exception e) {
            eventPublisher.publish(new PaymentFailedEvent(
                event.orderId(), e.getMessage()));
        }
    }
}

// OrderService слушает результаты и запускает компенсацию при ошибке
@Component
@RequiredArgsConstructor
public class PaymentResultListener {

    private final OrderService orderService;

    @KafkaListener(topics = "payment-events")
    public void onPaymentFailed(PaymentFailedEvent event) {
        orderService.cancel(event.orderId(),
            "Payment failed: " + event.reason());
    }
}

Плюсы хореографии. Слабая связность — сервисы не знают друг о друге. Каждый реагирует только на события.

Минусы. Логику саги сложно отследить — она размазана по сервисам. Циклические зависимости через события. Трудно понять, в каком состоянии находится процесс.

Оркестрация vs хореография

Оркестрация подходит, когда:

  • шагов больше 3-4
  • есть сложные ветвления (если оплата частичная — другой путь)
  • важно иметь единую точку для мониторинга и отладки
  • нужно гарантировать порядок шагов

Хореография подходит, когда:

  • шагов 2-3
  • сервисы слабо связаны и развиваются разными командами
  • линейный поток без ветвлений
  • нужна максимальная автономность сервисов

Правила проектирования компенсаций

Компенсация ≠ откат. refund() — это не rollback(). Это новая бизнес-транзакция, которая приводит систему к эквиваленту отмены. Деньги возвращаются отдельной операцией, а не отменой списания.

Идемпотентность обязательна. Компенсация может быть вызвана повторно (сеть, ретраи). Результат должен быть одинаковым:

public void refund(String paymentId) {
    Payment payment = paymentRepository.findById(paymentId)
        .orElseThrow();

    if (payment.getStatus() == PaymentStatus.REFUNDED) {
        return; // идемпотентность
    }

    paymentGateway.refund(payment.getGatewayId());
    payment.markRefunded();
    paymentRepository.save(payment);
}

Не все шаги требуют компенсации. Отправка email-уведомления — необратима, но и не критична. Можно отправить второе письмо «извините, заказ отменён».

Transactional Outbox

Проблема: dual write

Типичная ошибка — запись в БД и отправка события в брокер не атомарны:

@Transactional
public void createOrder(CreateOrderCommand cmd) {
    Order order = orderRepository.save(createOrder(cmd));
    // Что если приложение упадёт здесь? ↓
    kafkaTemplate.send("order-events", new OrderCreatedEvent(order.getId()));
}

Если приложение упало после save(), но до send() — заказ создан, а событие потеряно. Обратная ситуация тоже плоха: событие отправлено, а транзакция откатилась.

Решение: Outbox-таблица

Событие сохраняется в ту же БД, в той же транзакции, что и бизнес-данные. Отдельный процесс читает Outbox и отправляет события в брокер.

diagram
CREATE TABLE outbox_events (
    id             UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(255) NOT NULL,
    aggregate_id   VARCHAR(255) NOT NULL,
    event_type     VARCHAR(255) NOT NULL,
    payload        JSONB NOT NULL,
    created_at     TIMESTAMP NOT NULL DEFAULT now(),
    published_at   TIMESTAMP,
    retry_count    INT DEFAULT 0
);
@Component
@RequiredArgsConstructor
public class OutboxEventPublisher {

    private final OutboxRepository outboxRepository;

    @Transactional(propagation = Propagation.MANDATORY)
    public void save(String aggregateType, String aggregateId,
                     String eventType, Object payload) {
        OutboxEvent event = OutboxEvent.builder()
            .aggregateType(aggregateType)
            .aggregateId(aggregateId)
            .eventType(eventType)
            .payload(toJson(payload))
            .build();
        outboxRepository.save(event);
    }
}

// Бизнес-код: всё в одной транзакции
@Transactional
public Order createOrder(CreateOrderCommand cmd) {
    Order order = orderRepository.save(createOrder(cmd));
    outboxPublisher.save("Order", order.getId().toString(),
        "OrderCreated", new OrderCreatedPayload(order));
    return order;
}

Relay: доставка событий из Outbox в брокер

Polling Publisher — периодически опрашивает таблицу:

@Component
@RequiredArgsConstructor
public class OutboxPollingRelay {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafka;

    @Scheduled(fixedDelay = 500)
    @Transactional
    public void publishPending() {
        List<OutboxEvent> events = outboxRepository.findUnpublished(100);

        for (OutboxEvent event : events) {
            try {
                kafka.send(
                    event.getAggregateType().toLowerCase() + "-events",
                    event.getAggregateId(),
                    event.getPayload()
                ).get();

                event.markPublished();
                outboxRepository.save(event);
            } catch (Exception e) {
                event.incrementRetryCount();
                outboxRepository.save(event);
            }
        }
    }
}

Change Data Capture (CDC) — Debezium читает WAL базы данных и публикует изменения outbox-таблицы в Kafka напрямую. Без polling, без задержек, без дополнительной нагрузки на БД. Предпочтительный вариант для production.

Гарантии

Outbox обеспечивает at-least-once delivery: событие будет отправлено хотя бы один раз. Может быть отправлено дважды (relay упал после отправки, но до пометки published_at). Поэтому потребители обязаны быть идемпотентными.

Change Data Capture (CDC)

Идея

Вместо того чтобы сервис явно отправлял события, изменения в базе данных автоматически захватываются из WAL (Write-Ahead Log) и публикуются в брокер сообщений.

diagram

Преимущества перед явной отправкой событий

  • Нет dual write. Сервис пишет только в БД. Событие генерируется автоматически из WAL — невозможно потерять или отправить лишнее.
  • Нет изменений в коде приложения. Старый сервис, который ничего не знает про Kafka, автоматически начинает публиковать события.
  • Полная история. CDC захватывает каждое изменение, включая те, что сделаны напрямую в БД (миграции, ручные фиксы).

CDC + Outbox

CDC и Outbox — естественная пара. Сервис пишет в outbox-таблицу в транзакции с бизнес-данными. Debezium подхватывает INSERT из WAL и публикует в Kafka. Встроенный EventRouter трансформирует строку outbox в чистое событие — потребители не знают, что под капотом была промежуточная таблица.

Подводные камни

  • Порядок событий. CDC гарантирует порядок внутри одной строки (одного PK). Между разными строками порядок не гарантирован.
  • Schema evolution. Изменение структуры таблицы меняет формат событий. Нужен Schema Registry (Avro/Protobuf).
  • Operational overhead. Debezium — отдельный кластер, который нужно мониторить. Отставание коннектора, replication slot growth, WAL retention.

Event Sourcing

Идея

Вместо хранения текущего состояния — хранение последовательности событий, которые к этому состоянию привели. Текущее состояние вычисляется путём воспроизведения (replay) всех событий.

diagram

Традиционный подход vs Event Sourcing

Традиционный (State Sourcing): таблица orders содержит колонку status = 'CONFIRMED'. Предыдущие состояния потеряны — мы не знаем, когда заказ был создан, когда оплачен, сколько раз менялся статус.

Event Sourcing: таблица order_events содержит полную историю: OrderCreated → PaymentReceived → OrderConfirmed. Текущий статус вычисляется из цепочки.

Event Store

public interface EventStore {
    void append(String aggregateId, long expectedVersion, List<DomainEvent> events);
    List<DomainEvent> load(String aggregateId);
    List<DomainEvent> loadFrom(String aggregateId, long fromVersion);
}

@Repository
@RequiredArgsConstructor
public class JooqEventStore implements EventStore {

    private final DSLContext dsl;
    private final ObjectMapper mapper;

    @Override
    public void append(String aggregateId, long expectedVersion,
                       List<DomainEvent> events) {
        // Optimistic locking: проверяем, что никто не записал события
        // с момента нашего чтения
        Long currentVersion = dsl
            .select(DSL.max(EVENT_STORE.VERSION))
            .from(EVENT_STORE)
            .where(EVENT_STORE.AGGREGATE_ID.eq(aggregateId))
            .fetchOneInto(Long.class);

        if (currentVersion != null && currentVersion != expectedVersion) {
            throw new OptimisticLockException(
                "Expected version " + expectedVersion
                + ", but was " + currentVersion);
        }

        long version = expectedVersion;
        for (DomainEvent event : events) {
            version++;
            dsl.insertInto(EVENT_STORE)
                .set(EVENT_STORE.AGGREGATE_ID, aggregateId)
                .set(EVENT_STORE.VERSION, version)
                .set(EVENT_STORE.EVENT_TYPE, event.getClass().getSimpleName())
                .set(EVENT_STORE.PAYLOAD, JSONB.jsonb(mapper.writeValueAsString(event)))
                .set(EVENT_STORE.CREATED_AT, OffsetDateTime.now())
                .execute();
        }
    }

    @Override
    public List<DomainEvent> load(String aggregateId) {
        return dsl.selectFrom(EVENT_STORE)
            .where(EVENT_STORE.AGGREGATE_ID.eq(aggregateId))
            .orderBy(EVENT_STORE.VERSION.asc())
            .fetch(this::toDomainEvent);
    }
}

Агрегат с Event Sourcing

public class Order {

    private String id;
    private OrderStatus status;
    private Money total;
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
    private long version = 0;

    // --- Восстановление из событий ---

    public static Order fromEvents(List<DomainEvent> events) {
        Order order = new Order();
        for (DomainEvent event : events) {
            order.apply(event);
            order.version++;
        }
        return order;
    }

    private void apply(DomainEvent event) {
        switch (event) {
            case OrderCreated e -> {
                this.id = e.orderId();
                this.status = OrderStatus.CREATED;
                this.total = e.total();
            }
            case PaymentReceived e -> this.status = OrderStatus.PAID;
            case OrderConfirmed e -> this.status = OrderStatus.CONFIRMED;
            case OrderCancelled e -> this.status = OrderStatus.CANCELLED;
            default -> throw new IllegalArgumentException(
                "Unknown event: " + event.getClass());
        }
    }

    // --- Команды генерируют события ---

    public void confirm() {
        if (this.status != OrderStatus.PAID) {
            throw new IllegalStateException(
                "Can only confirm PAID orders, current=" + status);
        }
        raise(new OrderConfirmed(this.id, OffsetDateTime.now()));
    }

    private void raise(DomainEvent event) {
        apply(event);
        uncommittedEvents.add(event);
    }

    public List<DomainEvent> getUncommittedEvents() {
        return Collections.unmodifiableList(uncommittedEvents);
    }
}

Проекции (Read Models)

Event Sourcing разделяет запись и чтение. Писать — в Event Store (append-only). Читать — из проекций, оптимизированных под конкретные запросы. Это естественное продолжение CQRS.

@Component
@RequiredArgsConstructor
public class OrderSummaryProjection {

    private final OrderSummaryRepository repository;

    @EventHandler
    public void on(OrderCreated event) {
        repository.save(new OrderSummary(
            event.orderId(), event.userId(),
            event.total(), "CREATED", event.createdAt()));
    }

    @EventHandler
    public void on(OrderConfirmed event) {
        repository.updateStatus(event.orderId(), "CONFIRMED");
    }

    @EventHandler
    public void on(OrderCancelled event) {
        repository.updateStatus(event.orderId(), "CANCELLED");
    }
}

Когда применять

Подходит:

  • Важна полная аудиторская история (финансы, compliance, расчёты с продавцами маркетплейса)
  • Нужна возможность «отмотать время» — посмотреть состояние на любой момент
  • Сложная доменная логика с множеством переходов состояний
  • Нужен temporal query: «какие заказы были в статусе PAID на 15 марта?»

Избыточен:

  • CRUD без сложной бизнес-логики
  • Простые справочники и настройки
  • Проект без требований к аудиту и истории

Подводные камни

  • Eventual consistency между Event Store и проекциями. Проекция обновляется асинхронно — пользователь может не увидеть свои изменения сразу.
  • Версионирование событий. Событие OrderCreated_v1 не содержит поля currency. В _v2 оно появилось. Нужна стратегия миграции: upcasting (преобразование старых событий при чтении) или versioned handlers.
  • Размер Event Store. Агрегат с 10 000 событий — replay занимает время. Решение — снэпшоты: периодически сохранять текущее состояние и replay только от последнего снэпшота.

Idempotent Consumer

Проблема

В распределённой системе сообщения приходят повторно: ретраи брокера, перебалансировка consumer group, at-least-once гарантии. Обработчик должен давать одинаковый результат при повторной обработке.

Решение: таблица обработанных событий

@Component
@RequiredArgsConstructor
public class IdempotentEventProcessor {

    private final ProcessedEventRepository processedEvents;

    @Transactional
    public <T> void process(String eventId, Supplier<T> handler) {
        if (processedEvents.existsById(eventId)) {
            log.info("Event already processed, skipping: {}", eventId);
            return;
        }
        handler.get();
        processedEvents.save(new ProcessedEvent(eventId, Instant.now()));
    }
}

@KafkaListener(topics = "payment-events")
public void onPaymentCharged(PaymentChargedEvent event) {
    idempotentProcessor.process(event.eventId(), () -> {
        Order order = orderRepository.findById(event.orderId())
            .orElseThrow();
        order.markPaid(event.paymentId());
        orderRepository.save(order);
        return null;
    });
}
CREATE TABLE processed_events (
    event_id     VARCHAR(255) PRIMARY KEY,
    processed_at TIMESTAMP NOT NULL DEFAULT now()
);

-- Периодическая очистка
DELETE FROM processed_events
WHERE processed_at < now() - INTERVAL '7 days';

Idempotency Key для API

Тот же принцип работает для REST API. Клиент генерирует уникальный ключ, сервер проверяет — обрабатывался ли запрос с таким ключом:

@PostMapping("/payments")
public ResponseEntity<PaymentResult> charge(
        @RequestHeader("Idempotency-Key") String idempotencyKey,
        @RequestBody ChargeRequest request) {

    // Если запрос с таким ключом уже обработан —
    // возвращаем сохранённый результат
    return idempotentProcessor.processOrReturn(idempotencyKey, () ->
        paymentService.charge(request));
}

Подробнее про Idempotency-Key — в REST API: заголовки.

Distributed Lock

Проблема

Два экземпляра одного сервиса одновременно обрабатывают один заказ. Оба списывают деньги — пользователь платит дважды. Или два relay одновременно читают outbox — событие публикуется дважды.

Решение: распределённая блокировка

Блокировка через Redis (Redisson):

@Component
@RequiredArgsConstructor
public class DistributedLockService {

    private final RedissonClient redisson;

    public <T> T executeWithLock(String lockKey, Duration timeout,
                                  Supplier<T> action) {
        RLock lock = redisson.getLock(lockKey);

        boolean acquired = lock.tryLock(
            timeout.toMillis(),     // время ожидания захвата
            timeout.toMillis() * 2, // auto-release через 2x таймаута
            TimeUnit.MILLISECONDS);

        if (!acquired) {
            throw new LockAcquisitionException(
                "Cannot acquire lock: " + lockKey);
        }

        try {
            return action.get();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

// Использование: обработка заказа под блокировкой
public void processOrder(Long orderId) {
    lockService.executeWithLock(
        "order-processing:" + orderId,
        Duration.ofSeconds(30),
        () -> {
            Order order = orderRepository.findById(orderId)
                .orElseThrow();
            order.confirm(operation);
            orderRepository.save(order);
            return null;
        });
}

Блокировка через БД (SELECT FOR UPDATE)

Если Redis нет — можно использовать базу данных:

// jOOQ: SELECT FOR UPDATE
public Optional<Order> findByIdForUpdate(Long id) {
    OrdersRecord record = dsl.selectFrom(ORDERS)
        .where(ORDERS.ID.eq(id))
        .forUpdate()
        .skipLocked()       // не ждать, если заблокировано
        .fetchOneInto(OrdersRecord.class);

    return Optional.ofNullable(record).map(mapper::toDomainOrder);
}

Подводные камни

  • Deadlock. Два процесса блокируют ресурсы в разном порядке. Решение — всегда блокировать в одном порядке (например, по ID).
  • Забытая блокировка. Процесс взял блокировку, упал, не отпустил. Решение — TTL (auto-release).
  • Split-brain в Redis. Redis Sentinel переключил мастер, а старый мастер ещё жив — два процесса держат одну блокировку. Решение — Redlock (блокировка на нескольких независимых нодах).

Как паттерны работают вместе

Реальная система не использует один паттерн в изоляции. Они складываются в пирамиду.

diagram

OrderService создаёт заказ и сохраняет событие через Outbox (атомарно). CDC доставляет событие в Kafka. PaymentService обрабатывает через Idempotent Consumer, защищает от конкурентной обработки через Distributed Lock, и (опционально) хранит историю через Event Sourcing. Вся цепочка — SAGA с компенсациями.

Чек-лист: выбор паттерна

ТребованиеПаттерн
Строгая согласованность сейчас2PC (только в рамках одной инфраструктуры)
Допустима eventual consistency, шагов > 3SAGA — оркестрация
Допустима eventual consistency, шагов 2-3SAGA — хореография
Гарантировать доставку событийTransactional Outbox + CDC
Полная история измененийEvent Sourcing
Сообщения приходят повторноIdempotent Consumer / Idempotency Key
Конкурентный доступ к ресурсуDistributed Lock (Redis/SELECT FOR UPDATE)

Итого

Распределённые системы не дают бесплатных гарантий. Каждый паттерн решает конкретную проблему согласованности данных:

  • 2PC / 3PC — строгая согласованность, ценой блокировок и хрупкости
  • SAGA — согласованность через компенсации, eventual consistency
  • Transactional Outbox — атомарная запись данных и событий
  • CDC — автоматический захват изменений из WAL
  • Event Sourcing — полная история как источник истины
  • Idempotent Consumer — безопасная повторная обработка
  • Distributed Lock — защита от конкурентного доступа

В реальной системе эти паттерны складываются в пирамиду: Outbox + CDC гарантируют доставку, SAGA управляет потоком, Idempotent Consumer защищает от дублей, Distributed Lock не даёт двум процессам одновременно менять один ресурс.

Ссылки