Опирается на правила: R-DIST-IDEM-1R-DIST-IDEM-5 и R-DIST-IDEM-X1R-DIST-IDEM-X3 из Distributed Patterns Style Guide → раздел 3. Idempotency.

Важно знать

  • Распределённая система работает в режиме at-least-once — сообщения дублируются при rebalance, timeout и retry. Receiver обязан быть идемпотентным.
  • Каждое cross-service сообщение имеет уникальный ID: Kafka — event_id UUID v7, HTTP money — Idempotency-Key header, saga step — saga_id + step_name.
  • Receiver хранит processed-events в таблице processed_event; проверка и запись — в одной AsyncSession-транзакции с бизнес-обновлением.
  • HTTP-команды — таблица idempotency_record (key, command_hash, response): повтор возвращает сохранённый ответ; конфликт ключа с другой командой → 409 Conflict.
  • Money — двойная защита: client Idempotency-Key + UNIQUE constraint (payment_provider, external_payment_id) в БД.
  • TTL 24-72 часа для idempotency-records: меньше — реальный retry клиента не пройдёт дедупликацию; больше — таблица растёт без пользы.
  • Producer тоже обязан давать exactly-once: aiokafka enable_idempotence=True. Только receiver-side dedup — недостаточно (R-DIST-IDEM-X2).
  • UUID v7 генерируется через uuid_utils.uuid7() (PyPI uuid-utils) — time-sortable, без фрагментации B-tree индекса.

В распределённой системе нет «доставлено ровно один раз». Сеть теряет ACK, aiokafka повторяет при rebalance, HTTP-клиент retry-ит на timeout. Единственный способ выжить — receiver проверяет, не обработал ли он это сообщение, и при повторе возвращает тот же результат.

Уникальный ID на каждое сообщение

R-DIST-IDEM-1: каждое cross-service сообщение имеет уникальный ID.

ТранспортIDИсточник
Kafka eventevent_id UUID v7producer генерирует
HTTP money commandIdempotency-Key headerclient генерирует
Saga stepsaga_id + step_nameorchestrator знает

UUID v7 включает timestamp в старших 48 битах — монотонно растущий ключ, хороший для B-tree индекса в PG. В Python — библиотека uuid-utils:

import uuid_utils

from pydantic import BaseModel, Field
from datetime import datetime
import uuid


class OrderCreatedEvent(BaseModel):
    event_id: uuid.UUID = Field(default_factory=uuid_utils.uuid7)
    saga_id: uuid.UUID
    event_type: str = "OrderCreated.v1"
    order_id: int
    customer_id: int
    amount: str
    occurred_at: datetime = Field(default_factory=datetime.utcnow)

Processed-events для aiokafka consumer

R-DIST-IDEM-2: receiver хранит обработанные event_id в БД и проверяет перед обработкой в той же транзакции.

CREATE TABLE processed_event (
    event_id      uuid PRIMARY KEY,
    consumer_name text        NOT NULL,
    processed_at  timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX ix_processed_event_processed_at ON processed_event (processed_at);

Реализация через SQLAlchemy AsyncSession — проверка и запись атомарны с бизнес-обновлением:

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
import uuid


async def try_mark_processed(
    session: AsyncSession,
    event_id: uuid.UUID,
    consumer_name: str,
) -> bool:
    result = await session.execute(
        text(
            "INSERT INTO processed_event (event_id, consumer_name) "
            "VALUES (:event_id, :consumer_name) "
            "ON CONFLICT DO NOTHING "
            "RETURNING event_id"
        ),
        {"event_id": event_id, "consumer_name": consumer_name},
    )
    return result.rowcount == 1

Consumer на aiokafka — обработка и запись processed_event в одной транзакции:

from aiokafka import AIOKafkaConsumer
from sqlalchemy.ext.asyncio import async_sessionmaker


async def handle_order_created(
    event: OrderCreatedEvent,
    session_factory: async_sessionmaker[AsyncSession],
) -> None:
    async with session_factory() as session:
        async with session.begin():
            already_processed = not await try_mark_processed(
                session, event.event_id, "order-projection"
            )
            if already_processed:
                return

            await session.execute(
                text(
                    "INSERT INTO order_projection (order_id, customer_id, amount, status) "
                    "VALUES (:order_id, :customer_id, :amount, 'CREATED') "
                    "ON CONFLICT (order_id) DO UPDATE "
                    "SET amount = EXCLUDED.amount, status = EXCLUDED.status"
                ),
                {
                    "order_id": event.order_id,
                    "customer_id": event.customer_id,
                    "amount": event.amount,
                },
            )

INSERT ... ON CONFLICT DO NOTHING RETURNING event_id — атомарная проверка-и-вставка. Если rowcount == 0 — событие уже обработано, пропускаем. Всё в одном session.begin() — либо закоммитилось вместе, либо ничего.

Idempotency-Key для HTTP-команд

R-DIST-IDEM-3: для HTTP money-команд receiver хранит (idempotency_key, command_hash, response).

CREATE TABLE idempotency_record (
    idempotency_key text        PRIMARY KEY,
    command_hash    text        NOT NULL,
    response        jsonb       NOT NULL,
    http_status     int         NOT NULL,
    created_at      timestamptz NOT NULL DEFAULT now()
);

FastAPI endpoint — три случая обработки:

import hashlib
import json
from typing import Annotated

from fastapi import APIRouter, Header, HTTPException
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text

router = APIRouter()


class ChargeRequest(BaseModel):
    customer_id: int
    amount: str
    currency: str = "RUB"


class PaymentResponse(BaseModel):
    payment_id: int
    status: str
    amount: str


def _command_hash(request: ChargeRequest) -> str:
    payload = request.model_dump_json(sort_keys=True)
    return hashlib.sha256(payload.encode()).hexdigest()


@router.post("/payments", status_code=201)
async def charge_payment(
    request: ChargeRequest,
    session: AsyncSession,
    idempotency_key: Annotated[str, Header(alias="Idempotency-Key")],
) -> PaymentResponse:
    command_hash = _command_hash(request)

    existing = await session.execute(
        text("SELECT command_hash, response, http_status FROM idempotency_record WHERE idempotency_key = :key"),
        {"key": idempotency_key},
    )
    row = existing.fetchone()

    if row is not None:
        if row.command_hash != command_hash:
            raise HTTPException(status_code=409, detail="Idempotency key reused with different command")
        return PaymentResponse.model_validate(row.response)

    async with session.begin():
        payment = await _process_payment(session, request)
        response = PaymentResponse(payment_id=payment.id, status=payment.status, amount=request.amount)
        await session.execute(
            text(
                "INSERT INTO idempotency_record (idempotency_key, command_hash, response, http_status) "
                "VALUES (:key, :hash, :response, :status)"
            ),
            {
                "key": idempotency_key,
                "hash": command_hash,
                "response": response.model_dump(),
                "status": 201,
            },
        )

    return response

Три случая:

  • Ключ не встречался — обрабатываем, сохраняем результат, возвращаем 201.
  • Ключ встречался + та же команда — возвращаем сохранённый response. Клиент не получает повторного списания.
  • Ключ встречался + другая команда409 Conflict. Клиент пытается переиспользовать ключ для другого запроса — это ошибка на стороне клиента.

Двойная защита для money

R-DIST-IDEM-4: money-операции защищаются дважды — client Idempotency-Key + UNIQUE constraint в БД.

CREATE TABLE payment (
    id                   bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    payment_provider     text          NOT NULL,
    external_payment_id  text          NOT NULL,
    customer_id          int           NOT NULL,
    amount               numeric(19,4) NOT NULL,
    status               text          NOT NULL,
    created_at           timestamptz   NOT NULL DEFAULT now(),
    UNIQUE (payment_provider, external_payment_id)
);

Если Idempotency-Key пропустил дубль (разные client retries использовали разные ключи), UniqueViolationError на (payment_provider, external_payment_id) ловит его на уровне БД:

from asyncpg import UniqueViolationError
from sqlalchemy.exc import IntegrityError


async def _process_payment(session: AsyncSession, request: ChargeRequest):
    try:
        result = await session.execute(
            text(
                "INSERT INTO payment (payment_provider, external_payment_id, customer_id, amount, status) "
                "VALUES (:provider, :ext_id, :customer_id, :amount, 'PENDING') "
                "RETURNING id, status"
            ),
            {
                "provider": "sberbank",
                "ext_id": request.external_payment_id,
                "customer_id": request.customer_id,
                "amount": request.amount,
            },
        )
        return result.fetchone()
    except IntegrityError:
        existing = await session.execute(
            text("SELECT id, status FROM payment WHERE payment_provider = :provider AND external_payment_id = :ext_id"),
            {"provider": "sberbank", "ext_id": request.external_payment_id},
        )
        return existing.fetchone()

Money — единственный класс данных, где одного слоя защиты недостаточно. Idempotency-Key может потеряться при смене клиентской сессии, а UNIQUE constraint в БД — последний барьер перед двойным списанием.

TTL и cleanup

R-DIST-IDEM-5: idempotency-records хранятся 24-72 часа. Cleanup — отдельным фоновым заданием через APScheduler:

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime, timedelta, timezone
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
from sqlalchemy import text


async def cleanup_idempotency_records(session_factory: async_sessionmaker[AsyncSession]) -> None:
    cutoff = datetime.now(tz=timezone.utc) - timedelta(hours=72)
    async with session_factory() as session:
        async with session.begin():
            await session.execute(
                text("DELETE FROM idempotency_record WHERE created_at < :cutoff"),
                {"cutoff": cutoff},
            )
            await session.execute(
                text("DELETE FROM processed_event WHERE processed_at < :cutoff"),
                {"cutoff": cutoff},
            )


def setup_scheduler(session_factory: async_sessionmaker[AsyncSession]) -> AsyncIOScheduler:
    scheduler = AsyncIOScheduler()
    scheduler.add_job(
        cleanup_idempotency_records,
        trigger="cron",
        hour=3,
        minute=0,
        timezone="UTC",
        args=[session_factory],
    )
    return scheduler

Cleanup в lifespan FastAPI:

from contextlib import asynccontextmanager
from fastapi import FastAPI


@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler = setup_scheduler(session_factory)
    scheduler.start()
    yield
    scheduler.shutdown()


app = FastAPI(lifespan=lifespan)

Cleanup ночью по UTC — минимальная нагрузка на autovacuum в рабочие часы.

Producer exactly-once на aiokafka

R-DIST-IDEM-X2: producer тоже обязан иметь exactly-once гарантии — aiokafka enable_idempotence=True:

from aiokafka import AIOKafkaProducer
import json


async def create_producer() -> AIOKafkaProducer:
    return AIOKafkaProducer(
        bootstrap_servers="kafka:9092",
        enable_idempotence=True,       # R-KFK-PROD-1 — exactly-once на producer
        acks="all",
        max_in_flight_requests_per_connection=5,
        compression_type="gzip",
    )

Без enable_idempotence=True aiokafka может опубликовать одно и то же сообщение дважды при partial failure — broker получает два разных event_id для одной бизнес-операции, и receiver-side dedup по event_id не поможет.

Что запрещено

АнтипаттернПравилоЧто взамен
Receiver без dedup для moneyR-DIST-IDEM-X1processed_event + Idempotency-Key
Только receiver-side dedupR-DIST-IDEM-X2aiokafka enable_idempotence=True + receiver dedup
Новый UUID при каждом retryR-DIST-IDEM-X3один ключ генерируется один раз на бизнес-операцию
TTL idempotency-records < 24hR-DIST-IDEM-524-72 часа
TTL idempotency-records > 72h без причиныR-DIST-IDEM-524-72 часа + cleanup через APScheduler
Один слой защиты для moneyR-DIST-IDEM-4client key + UNIQUE constraint в БД
event_id без UUID v7R-DIST-IDEM-1uuid_utils.uuid7() — time-sortable, без фрагментации
Проверка и запись processed_event в разных транзакцияхR-DIST-IDEM-2один session.begin() для dedup-check + бизнес-обновления

Куда дальше

  • Распределённые паттерны — Saga — каждый шаг саги обязан быть идемпотентным.
  • Compensation — compensation тоже идемпотентен, может повторяться.
  • Outbox + Inbox — outbox решает producer-side гарантии exactly-once.
  • Eventual consistency — idempotency — предпосылка для безопасного replay событий.
  • Distributed transactions — почему 2PC не решает проблему дубликатов.
  • Когда нужны распределённые паттерны — перед введением idempotency убедись, что паттерн вообще нужен.