Опирается на правила:
R-DIST-IDEM-1…R-DIST-IDEM-5иR-DIST-IDEM-X1…R-DIST-IDEM-X3из Distributed Patterns Style Guide → раздел 3. Idempotency.
Важно знать
- Распределённая система работает в режиме at-least-once — сообщения дублируются при rebalance, timeout и retry. Receiver обязан быть идемпотентным.
- Каждое cross-service сообщение имеет уникальный ID: Kafka —
event_idUUID v7, HTTP money —Idempotency-Keyheader, saga step —saga_id + step_name.- Receiver хранит processed-events в таблице
processed_event; проверка и запись — в однойAsyncSession-транзакции с бизнес-обновлением.- HTTP-команды — таблица
idempotency_record (key, command_hash, response): повтор возвращает сохранённый ответ; конфликт ключа с другой командой →409 Conflict.- Money — двойная защита: client
Idempotency-Key+ UNIQUE constraint(payment_provider, external_payment_id)в БД.- TTL 24-72 часа для idempotency-records: меньше — реальный retry клиента не пройдёт дедупликацию; больше — таблица растёт без пользы.
- Producer тоже обязан давать exactly-once: aiokafka
enable_idempotence=True. Только receiver-side dedup — недостаточно (R-DIST-IDEM-X2).- UUID v7 генерируется через
uuid_utils.uuid7()(PyPIuuid-utils) — time-sortable, без фрагментации B-tree индекса.
В распределённой системе нет «доставлено ровно один раз». Сеть теряет ACK, aiokafka повторяет при rebalance, HTTP-клиент retry-ит на timeout. Единственный способ выжить — receiver проверяет, не обработал ли он это сообщение, и при повторе возвращает тот же результат.
Уникальный ID на каждое сообщение
R-DIST-IDEM-1: каждое cross-service сообщение имеет уникальный ID.
| Транспорт | ID | Источник |
|---|---|---|
| Kafka event | event_id UUID v7 | producer генерирует |
| HTTP money command | Idempotency-Key header | client генерирует |
| Saga step | saga_id + step_name | orchestrator знает |
UUID v7 включает timestamp в старших 48 битах — монотонно растущий ключ, хороший для B-tree индекса в PG. В Python — библиотека uuid-utils:
import uuid_utils
from pydantic import BaseModel, Field
from datetime import datetime
import uuid
class OrderCreatedEvent(BaseModel):
event_id: uuid.UUID = Field(default_factory=uuid_utils.uuid7)
saga_id: uuid.UUID
event_type: str = "OrderCreated.v1"
order_id: int
customer_id: int
amount: str
occurred_at: datetime = Field(default_factory=datetime.utcnow)
Processed-events для aiokafka consumer
R-DIST-IDEM-2: receiver хранит обработанные event_id в БД и проверяет перед обработкой в той же транзакции.
CREATE TABLE processed_event (
event_id uuid PRIMARY KEY,
consumer_name text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX ix_processed_event_processed_at ON processed_event (processed_at);
Реализация через SQLAlchemy AsyncSession — проверка и запись атомарны с бизнес-обновлением:
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
import uuid
async def try_mark_processed(
session: AsyncSession,
event_id: uuid.UUID,
consumer_name: str,
) -> bool:
result = await session.execute(
text(
"INSERT INTO processed_event (event_id, consumer_name) "
"VALUES (:event_id, :consumer_name) "
"ON CONFLICT DO NOTHING "
"RETURNING event_id"
),
{"event_id": event_id, "consumer_name": consumer_name},
)
return result.rowcount == 1
Consumer на aiokafka — обработка и запись processed_event в одной транзакции:
from aiokafka import AIOKafkaConsumer
from sqlalchemy.ext.asyncio import async_sessionmaker
async def handle_order_created(
event: OrderCreatedEvent,
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
async with session.begin():
already_processed = not await try_mark_processed(
session, event.event_id, "order-projection"
)
if already_processed:
return
await session.execute(
text(
"INSERT INTO order_projection (order_id, customer_id, amount, status) "
"VALUES (:order_id, :customer_id, :amount, 'CREATED') "
"ON CONFLICT (order_id) DO UPDATE "
"SET amount = EXCLUDED.amount, status = EXCLUDED.status"
),
{
"order_id": event.order_id,
"customer_id": event.customer_id,
"amount": event.amount,
},
)
INSERT ... ON CONFLICT DO NOTHING RETURNING event_id — атомарная проверка-и-вставка. Если rowcount == 0 — событие уже обработано, пропускаем. Всё в одном session.begin() — либо закоммитилось вместе, либо ничего.
Idempotency-Key для HTTP-команд
R-DIST-IDEM-3: для HTTP money-команд receiver хранит (idempotency_key, command_hash, response).
CREATE TABLE idempotency_record (
idempotency_key text PRIMARY KEY,
command_hash text NOT NULL,
response jsonb NOT NULL,
http_status int NOT NULL,
created_at timestamptz NOT NULL DEFAULT now()
);
FastAPI endpoint — три случая обработки:
import hashlib
import json
from typing import Annotated
from fastapi import APIRouter, Header, HTTPException
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
router = APIRouter()
class ChargeRequest(BaseModel):
customer_id: int
amount: str
currency: str = "RUB"
class PaymentResponse(BaseModel):
payment_id: int
status: str
amount: str
def _command_hash(request: ChargeRequest) -> str:
payload = request.model_dump_json(sort_keys=True)
return hashlib.sha256(payload.encode()).hexdigest()
@router.post("/payments", status_code=201)
async def charge_payment(
request: ChargeRequest,
session: AsyncSession,
idempotency_key: Annotated[str, Header(alias="Idempotency-Key")],
) -> PaymentResponse:
command_hash = _command_hash(request)
existing = await session.execute(
text("SELECT command_hash, response, http_status FROM idempotency_record WHERE idempotency_key = :key"),
{"key": idempotency_key},
)
row = existing.fetchone()
if row is not None:
if row.command_hash != command_hash:
raise HTTPException(status_code=409, detail="Idempotency key reused with different command")
return PaymentResponse.model_validate(row.response)
async with session.begin():
payment = await _process_payment(session, request)
response = PaymentResponse(payment_id=payment.id, status=payment.status, amount=request.amount)
await session.execute(
text(
"INSERT INTO idempotency_record (idempotency_key, command_hash, response, http_status) "
"VALUES (:key, :hash, :response, :status)"
),
{
"key": idempotency_key,
"hash": command_hash,
"response": response.model_dump(),
"status": 201,
},
)
return response
Три случая:
- Ключ не встречался — обрабатываем, сохраняем результат, возвращаем
201. - Ключ встречался + та же команда — возвращаем сохранённый response. Клиент не получает повторного списания.
- Ключ встречался + другая команда —
409 Conflict. Клиент пытается переиспользовать ключ для другого запроса — это ошибка на стороне клиента.
Двойная защита для money
R-DIST-IDEM-4: money-операции защищаются дважды — client Idempotency-Key + UNIQUE constraint в БД.
CREATE TABLE payment (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
payment_provider text NOT NULL,
external_payment_id text NOT NULL,
customer_id int NOT NULL,
amount numeric(19,4) NOT NULL,
status text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
UNIQUE (payment_provider, external_payment_id)
);
Если Idempotency-Key пропустил дубль (разные client retries использовали разные ключи), UniqueViolationError на (payment_provider, external_payment_id) ловит его на уровне БД:
from asyncpg import UniqueViolationError
from sqlalchemy.exc import IntegrityError
async def _process_payment(session: AsyncSession, request: ChargeRequest):
try:
result = await session.execute(
text(
"INSERT INTO payment (payment_provider, external_payment_id, customer_id, amount, status) "
"VALUES (:provider, :ext_id, :customer_id, :amount, 'PENDING') "
"RETURNING id, status"
),
{
"provider": "sberbank",
"ext_id": request.external_payment_id,
"customer_id": request.customer_id,
"amount": request.amount,
},
)
return result.fetchone()
except IntegrityError:
existing = await session.execute(
text("SELECT id, status FROM payment WHERE payment_provider = :provider AND external_payment_id = :ext_id"),
{"provider": "sberbank", "ext_id": request.external_payment_id},
)
return existing.fetchone()
Money — единственный класс данных, где одного слоя защиты недостаточно. Idempotency-Key может потеряться при смене клиентской сессии, а UNIQUE constraint в БД — последний барьер перед двойным списанием.
TTL и cleanup
R-DIST-IDEM-5: idempotency-records хранятся 24-72 часа. Cleanup — отдельным фоновым заданием через APScheduler:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime, timedelta, timezone
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
from sqlalchemy import text
async def cleanup_idempotency_records(session_factory: async_sessionmaker[AsyncSession]) -> None:
cutoff = datetime.now(tz=timezone.utc) - timedelta(hours=72)
async with session_factory() as session:
async with session.begin():
await session.execute(
text("DELETE FROM idempotency_record WHERE created_at < :cutoff"),
{"cutoff": cutoff},
)
await session.execute(
text("DELETE FROM processed_event WHERE processed_at < :cutoff"),
{"cutoff": cutoff},
)
def setup_scheduler(session_factory: async_sessionmaker[AsyncSession]) -> AsyncIOScheduler:
scheduler = AsyncIOScheduler()
scheduler.add_job(
cleanup_idempotency_records,
trigger="cron",
hour=3,
minute=0,
timezone="UTC",
args=[session_factory],
)
return scheduler
Cleanup в lifespan FastAPI:
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = setup_scheduler(session_factory)
scheduler.start()
yield
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
Cleanup ночью по UTC — минимальная нагрузка на autovacuum в рабочие часы.
Producer exactly-once на aiokafka
R-DIST-IDEM-X2: producer тоже обязан иметь exactly-once гарантии — aiokafka enable_idempotence=True:
from aiokafka import AIOKafkaProducer
import json
async def create_producer() -> AIOKafkaProducer:
return AIOKafkaProducer(
bootstrap_servers="kafka:9092",
enable_idempotence=True, # R-KFK-PROD-1 — exactly-once на producer
acks="all",
max_in_flight_requests_per_connection=5,
compression_type="gzip",
)
Без enable_idempotence=True aiokafka может опубликовать одно и то же сообщение дважды при partial failure — broker получает два разных event_id для одной бизнес-операции, и receiver-side dedup по event_id не поможет.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Receiver без dedup для money | R-DIST-IDEM-X1 | processed_event + Idempotency-Key |
| Только receiver-side dedup | R-DIST-IDEM-X2 | aiokafka enable_idempotence=True + receiver dedup |
| Новый UUID при каждом retry | R-DIST-IDEM-X3 | один ключ генерируется один раз на бизнес-операцию |
| TTL idempotency-records < 24h | R-DIST-IDEM-5 | 24-72 часа |
| TTL idempotency-records > 72h без причины | R-DIST-IDEM-5 | 24-72 часа + cleanup через APScheduler |
| Один слой защиты для money | R-DIST-IDEM-4 | client key + UNIQUE constraint в БД |
| event_id без UUID v7 | R-DIST-IDEM-1 | uuid_utils.uuid7() — time-sortable, без фрагментации |
| Проверка и запись processed_event в разных транзакциях | R-DIST-IDEM-2 | один session.begin() для dedup-check + бизнес-обновления |
Куда дальше
- Распределённые паттерны — Saga — каждый шаг саги обязан быть идемпотентным.
- Compensation — compensation тоже идемпотентен, может повторяться.
- Outbox + Inbox — outbox решает producer-side гарантии exactly-once.
- Eventual consistency — idempotency — предпосылка для безопасного replay событий.
- Distributed transactions — почему 2PC не решает проблему дубликатов.
- Когда нужны распределённые паттерны — перед введением idempotency убедись, что паттерн вообще нужен.