Распределённые паттерны
2PC, 3PC, SAGA, Transactional Outbox, CDC, Event Sourcing, Idempotent Consumer, Distributed Lock — согласованность данных в микросервисной архитектуре.
Все примеры в статье — на сквозном кейсе сайта: высоконагруженный маркетплейс. Когда монолит разбивается на микросервисы, исчезает главное удобство — единая транзакция базы данных.
@Transactionalбольше не спасает, если заказ живёт в одном сервисе, оплата — в другом, а склад — в третьем. Нужны паттерны, которые обеспечивают согласованность данных через границы сервисов.
Проблема: распределённая транзакция
В монолите всё просто:
@Transactional
public void placeOrder(CreateOrderCommand cmd) {
Order order = orderRepository.save(createOrder(cmd));
paymentService.charge(order.getUserId(), order.getTotal());
inventoryService.reserve(order.getItems());
notificationService.sendConfirmation(order);
}
Один @Transactional — и либо всё произошло, либо ничего. Но в микросервисах orderRepository, paymentService, inventoryService — это разные базы данных, разные процессы, разные сети. Откатить чужую транзакцию нельзя. Сеть может упасть между вторым и третьим вызовом. Сервис может ответить, но ответ не дойдёт.
Two-Phase Commit (2PC)
Идея
Координатор спрашивает всех участников: «Готовы зафиксировать?» Если все ответили «да» — команда «фиксируй». Если хоть один ответил «нет» — команда «откатись».
Фаза 1 — Prepare (голосование)
Координатор отправляет каждому участнику команду PREPARE. Участник выполняет всю работу (блокирует ресурсы, пишет WAL), но не фиксирует. Отвечает VOTE_YES или VOTE_ABORT.
Фаза 2 — Commit / Rollback (решение)
Если все проголосовали YES — координатор шлёт COMMIT. Если хоть один ABORT — координатор шлёт ROLLBACK.
Пример на Java
public interface TransactionParticipant {
boolean prepare(TransactionId txId, Object payload);
void commit(TransactionId txId);
void rollback(TransactionId txId);
}
@Component
@RequiredArgsConstructor
public class TwoPhaseCommitCoordinator {
private final List<TransactionParticipant> participants;
private final TransactionLogRepository txLog;
public void execute(TransactionId txId, Object payload) {
txLog.save(txId, TxStatus.STARTED);
// Фаза 1: Prepare
boolean allReady = true;
for (TransactionParticipant p : participants) {
try {
if (!p.prepare(txId, payload)) {
allReady = false;
break;
}
} catch (Exception e) {
allReady = false;
break;
}
}
// Фаза 2: Commit или Rollback
if (allReady) {
txLog.save(txId, TxStatus.COMMITTING);
for (TransactionParticipant p : participants) {
p.commit(txId);
}
txLog.save(txId, TxStatus.COMMITTED);
} else {
txLog.save(txId, TxStatus.ROLLING_BACK);
for (TransactionParticipant p : participants) {
p.rollback(txId);
}
txLog.save(txId, TxStatus.ROLLED_BACK);
}
}
}
Проблемы 2PC
- Блокировки. Между
PREPAREиCOMMITресурсы заблокированы. Если координатор упал — участники зависли с заблокированными строками. - Единая точка отказа. Координатор обязан быть доступен. Его падение между фазами оставляет систему в неопределённом состоянии.
- Latency. Два сетевых раунда минимум. Каждый участник ждёт остальных.
- Масштабируемость. Чем больше участников, тем выше вероятность, что кто-то не ответит вовремя.
Когда применять
2PC работает в рамках одной инфраструктуры: несколько баз данных одного вендора, XA-транзакции через JTA. Для микросервисов через сеть — почти никогда.
Three-Phase Commit (3PC)
Зачем третья фаза
2PC страдает от blocking problem: если координатор упал после PREPARE, но до COMMIT — участники не знают, что делать. Ресурсы заблокированы, система зависла.
3PC добавляет промежуточную фазу PRE_COMMIT, которая даёт участникам достаточно информации для самостоятельного решения.
- Фаза 1 — Can Commit? Координатор спрашивает участников, могут ли они выполнить транзакцию. Участники проверяют ресурсы, но ещё ничего не блокируют.
- Фаза 2 — Pre-Commit. Координатор сообщает: все согласны. Участники фиксируют намерение и блокируют ресурсы. Теперь, если координатор пропадёт, участники знают, что все проголосовали
YES, и могут самостоятельно зафиксировать по таймауту. - Фаза 3 — Do Commit. Финальная команда на фиксацию.
Почему 3PC редко используют
Три сетевых раунда вместо двух — ещё больше latency. И главное: 3PC не решает проблему сетевых разделений (network partition). Если участник получил PRE_COMMIT, а потом потерял связь с координатором и другими участниками — он зафиксирует по таймауту. Но если другой участник в это время откатился — данные рассогласованы.
На практике проблемы 2PC решают не через 3PC, а через SAGA + eventual consistency.
SAGA
Идея
Вместо одной большой транзакции — цепочка локальных транзакций. Каждый шаг фиксируется самостоятельно. Если шаг N упал — выполняются компенсирующие транзакции для шагов N-1, N-2, ..., 1.
Шаг 1: Создать заказ → Компенсация: Отменить заказ
Шаг 2: Списать деньги → Компенсация: Вернуть деньги
Шаг 3: Зарезервировать на складе → Компенсация: Снять резерв
Шаг 4: Отправить уведомление → Компенсация: (не нужна)
Ключевое отличие от 2PC: промежуточные состояния видны другим сервисам. Между шагом 2 (деньги списаны) и шагом 3 (склад зарезервирован) система находится во временно несогласованном состоянии. Это eventual consistency.
Оркестрация
Центральный оркестратор управляет последовательностью шагов. Знает, какой шаг выполнить следующим, и какую компенсацию вызвать при ошибке.
public record SagaStep<T>(
String name,
Function<T, StepResult> action,
Consumer<T> compensation
) {}
@Component
@RequiredArgsConstructor
public class CreateOrderSaga {
private final OrderService orderService;
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final SagaLogRepository sagaLog;
public OrderResult execute(CreateOrderRequest request) {
SagaContext ctx = new SagaContext(request);
List<SagaStep<SagaContext>> completedSteps = new ArrayList<>();
List<SagaStep<SagaContext>> steps = List.of(
new SagaStep<>("create-order",
c -> orderService.create(c.getRequest()),
c -> orderService.cancel(c.getOrderId())),
new SagaStep<>("charge-payment",
c -> paymentService.charge(c.getUserId(), c.getTotal()),
c -> paymentService.refund(c.getPaymentId())),
new SagaStep<>("reserve-inventory",
c -> inventoryService.reserve(c.getOrderId(), c.getItems()),
c -> inventoryService.releaseReservation(c.getOrderId()))
);
for (SagaStep<SagaContext> step : steps) {
try {
sagaLog.save(ctx.getSagaId(), step.name(), StepStatus.STARTED);
StepResult result = step.action().apply(ctx);
ctx.apply(result);
completedSteps.add(step);
sagaLog.save(ctx.getSagaId(), step.name(), StepStatus.COMPLETED);
} catch (Exception e) {
sagaLog.save(ctx.getSagaId(), step.name(), StepStatus.FAILED);
compensate(ctx, completedSteps);
throw new SagaException("Step failed: " + step.name(), e);
}
}
return ctx.toResult();
}
private void compensate(SagaContext ctx,
List<SagaStep<SagaContext>> completedSteps) {
ListIterator<SagaStep<SagaContext>> it =
completedSteps.listIterator(completedSteps.size());
while (it.hasPrevious()) {
SagaStep<SagaContext> step = it.previous();
try {
sagaLog.save(ctx.getSagaId(), step.name(), StepStatus.COMPENSATING);
step.compensation().accept(ctx);
sagaLog.save(ctx.getSagaId(), step.name(), StepStatus.COMPENSATED);
} catch (Exception e) {
sagaLog.save(ctx.getSagaId(), step.name(), StepStatus.COMPENSATION_FAILED);
log.error("Compensation failed for step: {}", step.name(), e);
}
}
}
}
Плюсы оркестрации. Логика в одном месте. Легко понять последовательность. Проще дебажить — смотришь лог оркестратора.
Минусы. Оркестратор — единая точка связности. Знает про все сервисы. Может вырасти в God Object.
Хореография
Нет центрального координатора. Каждый сервис слушает события и реагирует, публикуя свои.
// PaymentService слушает событие и реагирует
@Component
@RequiredArgsConstructor
public class OrderCreatedListener {
private final PaymentService paymentService;
private final EventPublisher eventPublisher;
@KafkaListener(topics = "order-events")
public void onOrderCreated(OrderCreatedEvent event) {
try {
PaymentResult result = paymentService.charge(
event.userId(), event.total());
eventPublisher.publish(new PaymentChargedEvent(
event.orderId(), result.paymentId()));
} catch (Exception e) {
eventPublisher.publish(new PaymentFailedEvent(
event.orderId(), e.getMessage()));
}
}
}
// OrderService слушает результаты и запускает компенсацию при ошибке
@Component
@RequiredArgsConstructor
public class PaymentResultListener {
private final OrderService orderService;
@KafkaListener(topics = "payment-events")
public void onPaymentFailed(PaymentFailedEvent event) {
orderService.cancel(event.orderId(),
"Payment failed: " + event.reason());
}
}
Плюсы хореографии. Слабая связность — сервисы не знают друг о друге. Каждый реагирует только на события.
Минусы. Логику саги сложно отследить — она размазана по сервисам. Циклические зависимости через события. Трудно понять, в каком состоянии находится процесс.
Оркестрация vs хореография
Оркестрация подходит, когда:
- шагов больше 3-4
- есть сложные ветвления (если оплата частичная — другой путь)
- важно иметь единую точку для мониторинга и отладки
- нужно гарантировать порядок шагов
Хореография подходит, когда:
- шагов 2-3
- сервисы слабо связаны и развиваются разными командами
- линейный поток без ветвлений
- нужна максимальная автономность сервисов
Правила проектирования компенсаций
Компенсация ≠ откат. refund() — это не rollback(). Это новая бизнес-транзакция, которая приводит систему к эквиваленту отмены. Деньги возвращаются отдельной операцией, а не отменой списания.
Идемпотентность обязательна. Компенсация может быть вызвана повторно (сеть, ретраи). Результат должен быть одинаковым:
public void refund(String paymentId) {
Payment payment = paymentRepository.findById(paymentId)
.orElseThrow();
if (payment.getStatus() == PaymentStatus.REFUNDED) {
return; // идемпотентность
}
paymentGateway.refund(payment.getGatewayId());
payment.markRefunded();
paymentRepository.save(payment);
}
Не все шаги требуют компенсации. Отправка email-уведомления — необратима, но и не критична. Можно отправить второе письмо «извините, заказ отменён».
Transactional Outbox
Проблема: dual write
Типичная ошибка — запись в БД и отправка события в брокер не атомарны:
@Transactional
public void createOrder(CreateOrderCommand cmd) {
Order order = orderRepository.save(createOrder(cmd));
// Что если приложение упадёт здесь? ↓
kafkaTemplate.send("order-events", new OrderCreatedEvent(order.getId()));
}
Если приложение упало после save(), но до send() — заказ создан, а событие потеряно. Обратная ситуация тоже плоха: событие отправлено, а транзакция откатилась.
Решение: Outbox-таблица
Событие сохраняется в ту же БД, в той же транзакции, что и бизнес-данные. Отдельный процесс читает Outbox и отправляет события в брокер.
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT now(),
published_at TIMESTAMP,
retry_count INT DEFAULT 0
);
@Component
@RequiredArgsConstructor
public class OutboxEventPublisher {
private final OutboxRepository outboxRepository;
@Transactional(propagation = Propagation.MANDATORY)
public void save(String aggregateType, String aggregateId,
String eventType, Object payload) {
OutboxEvent event = OutboxEvent.builder()
.aggregateType(aggregateType)
.aggregateId(aggregateId)
.eventType(eventType)
.payload(toJson(payload))
.build();
outboxRepository.save(event);
}
}
// Бизнес-код: всё в одной транзакции
@Transactional
public Order createOrder(CreateOrderCommand cmd) {
Order order = orderRepository.save(createOrder(cmd));
outboxPublisher.save("Order", order.getId().toString(),
"OrderCreated", new OrderCreatedPayload(order));
return order;
}
Relay: доставка событий из Outbox в брокер
Polling Publisher — периодически опрашивает таблицу:
@Component
@RequiredArgsConstructor
public class OutboxPollingRelay {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafka;
@Scheduled(fixedDelay = 500)
@Transactional
public void publishPending() {
List<OutboxEvent> events = outboxRepository.findUnpublished(100);
for (OutboxEvent event : events) {
try {
kafka.send(
event.getAggregateType().toLowerCase() + "-events",
event.getAggregateId(),
event.getPayload()
).get();
event.markPublished();
outboxRepository.save(event);
} catch (Exception e) {
event.incrementRetryCount();
outboxRepository.save(event);
}
}
}
}
Change Data Capture (CDC) — Debezium читает WAL базы данных и публикует изменения outbox-таблицы в Kafka напрямую. Без polling, без задержек, без дополнительной нагрузки на БД. Предпочтительный вариант для production.
Гарантии
Outbox обеспечивает at-least-once delivery: событие будет отправлено хотя бы один раз. Может быть отправлено дважды (relay упал после отправки, но до пометки published_at). Поэтому потребители обязаны быть идемпотентными.
Change Data Capture (CDC)
Идея
Вместо того чтобы сервис явно отправлял события, изменения в базе данных автоматически захватываются из WAL (Write-Ahead Log) и публикуются в брокер сообщений.
Преимущества перед явной отправкой событий
- Нет dual write. Сервис пишет только в БД. Событие генерируется автоматически из WAL — невозможно потерять или отправить лишнее.
- Нет изменений в коде приложения. Старый сервис, который ничего не знает про Kafka, автоматически начинает публиковать события.
- Полная история. CDC захватывает каждое изменение, включая те, что сделаны напрямую в БД (миграции, ручные фиксы).
CDC + Outbox
CDC и Outbox — естественная пара. Сервис пишет в outbox-таблицу в транзакции с бизнес-данными. Debezium подхватывает INSERT из WAL и публикует в Kafka. Встроенный EventRouter трансформирует строку outbox в чистое событие — потребители не знают, что под капотом была промежуточная таблица.
Подводные камни
- Порядок событий. CDC гарантирует порядок внутри одной строки (одного PK). Между разными строками порядок не гарантирован.
- Schema evolution. Изменение структуры таблицы меняет формат событий. Нужен Schema Registry (Avro/Protobuf).
- Operational overhead. Debezium — отдельный кластер, который нужно мониторить. Отставание коннектора, replication slot growth, WAL retention.
Event Sourcing
Идея
Вместо хранения текущего состояния — хранение последовательности событий, которые к этому состоянию привели. Текущее состояние вычисляется путём воспроизведения (replay) всех событий.
Традиционный подход vs Event Sourcing
Традиционный (State Sourcing): таблица orders содержит колонку status = 'CONFIRMED'. Предыдущие состояния потеряны — мы не знаем, когда заказ был создан, когда оплачен, сколько раз менялся статус.
Event Sourcing: таблица order_events содержит полную историю: OrderCreated → PaymentReceived → OrderConfirmed. Текущий статус вычисляется из цепочки.
Event Store
public interface EventStore {
void append(String aggregateId, long expectedVersion, List<DomainEvent> events);
List<DomainEvent> load(String aggregateId);
List<DomainEvent> loadFrom(String aggregateId, long fromVersion);
}
@Repository
@RequiredArgsConstructor
public class JooqEventStore implements EventStore {
private final DSLContext dsl;
private final ObjectMapper mapper;
@Override
public void append(String aggregateId, long expectedVersion,
List<DomainEvent> events) {
// Optimistic locking: проверяем, что никто не записал события
// с момента нашего чтения
Long currentVersion = dsl
.select(DSL.max(EVENT_STORE.VERSION))
.from(EVENT_STORE)
.where(EVENT_STORE.AGGREGATE_ID.eq(aggregateId))
.fetchOneInto(Long.class);
if (currentVersion != null && currentVersion != expectedVersion) {
throw new OptimisticLockException(
"Expected version " + expectedVersion
+ ", but was " + currentVersion);
}
long version = expectedVersion;
for (DomainEvent event : events) {
version++;
dsl.insertInto(EVENT_STORE)
.set(EVENT_STORE.AGGREGATE_ID, aggregateId)
.set(EVENT_STORE.VERSION, version)
.set(EVENT_STORE.EVENT_TYPE, event.getClass().getSimpleName())
.set(EVENT_STORE.PAYLOAD, JSONB.jsonb(mapper.writeValueAsString(event)))
.set(EVENT_STORE.CREATED_AT, OffsetDateTime.now())
.execute();
}
}
@Override
public List<DomainEvent> load(String aggregateId) {
return dsl.selectFrom(EVENT_STORE)
.where(EVENT_STORE.AGGREGATE_ID.eq(aggregateId))
.orderBy(EVENT_STORE.VERSION.asc())
.fetch(this::toDomainEvent);
}
}
Агрегат с Event Sourcing
public class Order {
private String id;
private OrderStatus status;
private Money total;
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
private long version = 0;
// --- Восстановление из событий ---
public static Order fromEvents(List<DomainEvent> events) {
Order order = new Order();
for (DomainEvent event : events) {
order.apply(event);
order.version++;
}
return order;
}
private void apply(DomainEvent event) {
switch (event) {
case OrderCreated e -> {
this.id = e.orderId();
this.status = OrderStatus.CREATED;
this.total = e.total();
}
case PaymentReceived e -> this.status = OrderStatus.PAID;
case OrderConfirmed e -> this.status = OrderStatus.CONFIRMED;
case OrderCancelled e -> this.status = OrderStatus.CANCELLED;
default -> throw new IllegalArgumentException(
"Unknown event: " + event.getClass());
}
}
// --- Команды генерируют события ---
public void confirm() {
if (this.status != OrderStatus.PAID) {
throw new IllegalStateException(
"Can only confirm PAID orders, current=" + status);
}
raise(new OrderConfirmed(this.id, OffsetDateTime.now()));
}
private void raise(DomainEvent event) {
apply(event);
uncommittedEvents.add(event);
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
}
Проекции (Read Models)
Event Sourcing разделяет запись и чтение. Писать — в Event Store (append-only). Читать — из проекций, оптимизированных под конкретные запросы. Это естественное продолжение CQRS.
@Component
@RequiredArgsConstructor
public class OrderSummaryProjection {
private final OrderSummaryRepository repository;
@EventHandler
public void on(OrderCreated event) {
repository.save(new OrderSummary(
event.orderId(), event.userId(),
event.total(), "CREATED", event.createdAt()));
}
@EventHandler
public void on(OrderConfirmed event) {
repository.updateStatus(event.orderId(), "CONFIRMED");
}
@EventHandler
public void on(OrderCancelled event) {
repository.updateStatus(event.orderId(), "CANCELLED");
}
}
Когда применять
Подходит:
- Важна полная аудиторская история (финансы, compliance, расчёты с продавцами маркетплейса)
- Нужна возможность «отмотать время» — посмотреть состояние на любой момент
- Сложная доменная логика с множеством переходов состояний
- Нужен temporal query: «какие заказы были в статусе PAID на 15 марта?»
Избыточен:
- CRUD без сложной бизнес-логики
- Простые справочники и настройки
- Проект без требований к аудиту и истории
Подводные камни
- Eventual consistency между Event Store и проекциями. Проекция обновляется асинхронно — пользователь может не увидеть свои изменения сразу.
- Версионирование событий. Событие
OrderCreated_v1не содержит поляcurrency. В_v2оно появилось. Нужна стратегия миграции: upcasting (преобразование старых событий при чтении) или versioned handlers. - Размер Event Store. Агрегат с 10 000 событий — replay занимает время. Решение — снэпшоты: периодически сохранять текущее состояние и replay только от последнего снэпшота.
Idempotent Consumer
Проблема
В распределённой системе сообщения приходят повторно: ретраи брокера, перебалансировка consumer group, at-least-once гарантии. Обработчик должен давать одинаковый результат при повторной обработке.
Решение: таблица обработанных событий
@Component
@RequiredArgsConstructor
public class IdempotentEventProcessor {
private final ProcessedEventRepository processedEvents;
@Transactional
public <T> void process(String eventId, Supplier<T> handler) {
if (processedEvents.existsById(eventId)) {
log.info("Event already processed, skipping: {}", eventId);
return;
}
handler.get();
processedEvents.save(new ProcessedEvent(eventId, Instant.now()));
}
}
@KafkaListener(topics = "payment-events")
public void onPaymentCharged(PaymentChargedEvent event) {
idempotentProcessor.process(event.eventId(), () -> {
Order order = orderRepository.findById(event.orderId())
.orElseThrow();
order.markPaid(event.paymentId());
orderRepository.save(order);
return null;
});
}
CREATE TABLE processed_events (
event_id VARCHAR(255) PRIMARY KEY,
processed_at TIMESTAMP NOT NULL DEFAULT now()
);
-- Периодическая очистка
DELETE FROM processed_events
WHERE processed_at < now() - INTERVAL '7 days';
Idempotency Key для API
Тот же принцип работает для REST API. Клиент генерирует уникальный ключ, сервер проверяет — обрабатывался ли запрос с таким ключом:
@PostMapping("/payments")
public ResponseEntity<PaymentResult> charge(
@RequestHeader("Idempotency-Key") String idempotencyKey,
@RequestBody ChargeRequest request) {
// Если запрос с таким ключом уже обработан —
// возвращаем сохранённый результат
return idempotentProcessor.processOrReturn(idempotencyKey, () ->
paymentService.charge(request));
}
Подробнее про Idempotency-Key — в REST API: заголовки.
Distributed Lock
Проблема
Два экземпляра одного сервиса одновременно обрабатывают один заказ. Оба списывают деньги — пользователь платит дважды. Или два relay одновременно читают outbox — событие публикуется дважды.
Решение: распределённая блокировка
Блокировка через Redis (Redisson):
@Component
@RequiredArgsConstructor
public class DistributedLockService {
private final RedissonClient redisson;
public <T> T executeWithLock(String lockKey, Duration timeout,
Supplier<T> action) {
RLock lock = redisson.getLock(lockKey);
boolean acquired = lock.tryLock(
timeout.toMillis(), // время ожидания захвата
timeout.toMillis() * 2, // auto-release через 2x таймаута
TimeUnit.MILLISECONDS);
if (!acquired) {
throw new LockAcquisitionException(
"Cannot acquire lock: " + lockKey);
}
try {
return action.get();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
// Использование: обработка заказа под блокировкой
public void processOrder(Long orderId) {
lockService.executeWithLock(
"order-processing:" + orderId,
Duration.ofSeconds(30),
() -> {
Order order = orderRepository.findById(orderId)
.orElseThrow();
order.confirm(operation);
orderRepository.save(order);
return null;
});
}
Блокировка через БД (SELECT FOR UPDATE)
Если Redis нет — можно использовать базу данных:
// jOOQ: SELECT FOR UPDATE
public Optional<Order> findByIdForUpdate(Long id) {
OrdersRecord record = dsl.selectFrom(ORDERS)
.where(ORDERS.ID.eq(id))
.forUpdate()
.skipLocked() // не ждать, если заблокировано
.fetchOneInto(OrdersRecord.class);
return Optional.ofNullable(record).map(mapper::toDomainOrder);
}
Подводные камни
- Deadlock. Два процесса блокируют ресурсы в разном порядке. Решение — всегда блокировать в одном порядке (например, по ID).
- Забытая блокировка. Процесс взял блокировку, упал, не отпустил. Решение — TTL (auto-release).
- Split-brain в Redis. Redis Sentinel переключил мастер, а старый мастер ещё жив — два процесса держат одну блокировку. Решение — Redlock (блокировка на нескольких независимых нодах).
Как паттерны работают вместе
Реальная система не использует один паттерн в изоляции. Они складываются в пирамиду.
OrderService создаёт заказ и сохраняет событие через Outbox (атомарно). CDC доставляет событие в Kafka. PaymentService обрабатывает через Idempotent Consumer, защищает от конкурентной обработки через Distributed Lock, и (опционально) хранит историю через Event Sourcing. Вся цепочка — SAGA с компенсациями.
Чек-лист: выбор паттерна
| Требование | Паттерн |
|---|---|
| Строгая согласованность сейчас | 2PC (только в рамках одной инфраструктуры) |
| Допустима eventual consistency, шагов > 3 | SAGA — оркестрация |
| Допустима eventual consistency, шагов 2-3 | SAGA — хореография |
| Гарантировать доставку событий | Transactional Outbox + CDC |
| Полная история изменений | Event Sourcing |
| Сообщения приходят повторно | Idempotent Consumer / Idempotency Key |
| Конкурентный доступ к ресурсу | Distributed Lock (Redis/SELECT FOR UPDATE) |
Итого
Распределённые системы не дают бесплатных гарантий. Каждый паттерн решает конкретную проблему согласованности данных:
- 2PC / 3PC — строгая согласованность, ценой блокировок и хрупкости
- SAGA — согласованность через компенсации, eventual consistency
- Transactional Outbox — атомарная запись данных и событий
- CDC — автоматический захват изменений из WAL
- Event Sourcing — полная история как источник истины
- Idempotent Consumer — безопасная повторная обработка
- Distributed Lock — защита от конкурентного доступа
В реальной системе эти паттерны складываются в пирамиду: Outbox + CDC гарантируют доставку, SAGA управляет потоком, Idempotent Consumer защищает от дублей, Distributed Lock не даёт двум процессам одновременно менять один ресурс.
Ссылки
- Кейс: маркетплейс — где применяются эти паттерны в продуктивной архитектуре.
- CQRS — основа для проекций в Event Sourcing.
- Гексагональная архитектура — как изолировать SAGA-оркестратор от инфраструктуры.
- REST API: заголовки и трассировка — про
Idempotency-Key.