Опирается на правила:
R-KFK-IDEM-1…R-KFK-IDEM-4иR-KFK-IDEM-X1…R-KFK-IDEM-X2из Kafka Style Guide → раздел 4. Idempotent consumer.
Важно знать
- Kafka — at-least-once: дубликаты норма (rebalance, DLQ replay, offset reset), consumer обязан с ними справляться.
- Уникальный
event_id(UUID v7) в payload — единственный надёжный dedup-ключ.processed_eventтаблица с PRIMARY KEY наevent_id— UNIQUE constraint защищает даже под race conditions.- Запись в
processed_eventи бизнес-результат — в одной транзакции.- Manual commit (
enable_auto_commit=False) +await consumer.commit()только после успешной обработки.- Money — двойная защита:
event_id+Idempotency-Keyна downstream HTTP.- TTL
processed_eventобязателен: partitioning + drop partition или фоновая задача.- Kafka offset как dedup — нельзя: offset зависит от consumer-group, не от события.
Любой aiokafka consumer обязан быть idempotent. Producer с enable_idempotence=True устраняет дубликаты на уровне producer-partition, но дубликаты возможны при rebalance до commit, при replay из DLQ, при auto_offset_reset="earliest". Принять «обычно срабатывает один раз» — значит ждать инцидент.
Уникальный event_id в payload
R-KFK-IDEM-1: каждое событие имеет UUID v7 идентификатор в payload.
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from decimal import Decimal
@dataclass(frozen=True)
class OrderConfirmedEvent:
event_id: uuid.UUID
event_type: str
occurred_at: datetime
order_id: int
customer_id: int
total_amount: Decimal
@classmethod
def from_order(cls, order: "Order") -> "OrderConfirmedEvent":
return cls(
event_id=uuid.uuid7(),
event_type="order.confirmed.v1",
occurred_at=order.confirmed_at,
order_id=order.id,
customer_id=order.customer_id,
total_amount=order.total_amount,
)
UUID v7 — time-sortable (первые 48 бит — timestamp). Это даёт sequential insert в processed_event PK B-tree с низкой фрагментацией и позволяет сортировать события по времени без отдельного occurred_at индекса.
На границе consumer — Pydantic-модель (R-KFK-EVT-4):
from pydantic import BaseModel, UUID7
from datetime import datetime
from decimal import Decimal
class OrderConfirmedMessage(BaseModel):
event_id: UUID7
event_type: str
occurred_at: datetime
order_id: int
customer_id: int
total_amount: Decimal
processed_event таблица
R-KFK-IDEM-2: DDL с PRIMARY KEY на event_id.
CREATE TABLE processed_event (
event_id uuid NOT NULL,
consumer_group text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (event_id, consumer_group)
);
CREATE INDEX ix_processed_event_processed_at ON processed_event(processed_at);
PRIMARY KEY на (event_id, consumer_group) — когда несколько сервисов читают один топик (billing-order-confirmed и analytics-order-confirmed), каждый ведёт свой dedup независимо. Если топик читает ровно один сервис — достаточно PRIMARY KEY на event_id.
INSERT ... ON CONFLICT DO NOTHING — атомарный dedup на уровне БД. Даже если два worker-процесса получат дубль одновременно — один INSERT пройдёт, второй вернёт 0 строк.
async def try_mark_processed(
conn: asyncpg.Connection,
event_id: uuid.UUID,
consumer_group: str,
) -> bool:
result = await conn.execute(
"""
INSERT INTO processed_event (event_id, consumer_group)
VALUES ($1, $2)
ON CONFLICT DO NOTHING
""",
event_id,
consumer_group,
)
return result == "INSERT 0 1"
Dedup в consumer-цикле
R-KFK-IDEM-3: проверка processed_event и бизнес-логика — в одной транзакции через asyncpg.
from aiokafka import AIOKafkaConsumer
from pydantic import ValidationError
import asyncpg
import json
CONSUMER_GROUP = "billing-order-confirmed"
async def run_consumer(
pool: asyncpg.Pool,
billing_service: "BillingService",
) -> None:
consumer = AIOKafkaConsumer(
"order-service.order.confirmed",
bootstrap_servers=settings.kafka_brokers,
group_id=CONSUMER_GROUP,
enable_auto_commit=False,
auto_offset_reset="earliest",
value_deserializer=lambda b: json.loads(b),
)
await consumer.start()
try:
async for msg in consumer:
await _handle(msg, pool, billing_service, consumer)
finally:
await consumer.stop()
async def _handle(
msg,
pool: asyncpg.Pool,
billing_service: "BillingService",
consumer: AIOKafkaConsumer,
) -> None:
try:
event = OrderConfirmedMessage.model_validate(msg.value)
except ValidationError:
await consumer.commit()
return
async with pool.acquire() as conn:
async with conn.transaction():
already = not await try_mark_processed(conn, event.event_id, CONSUMER_GROUP)
if already:
pass
else:
await billing_service.charge(conn, event.order_id, event.total_amount)
await consumer.commit()
Транзакция оборачивает try_mark_processed и billing_service.charge. Если charge бросает исключение — processed_event тоже откатывается, следующий poll обработает событие заново.
Если процесс упал после успешного commit транзакции, но до await consumer.commit() — следующий poll вернёт то же сообщение. try_mark_processed вернёт False (запись уже есть), обработка пропустится, offset зафиксируется. Идемпотентность работает.
Money — двойная защита
R-KFK-IDEM-4: для денежных операций — event_id + Idempotency-Key на downstream HTTP.
import httpx
class BillingService:
def __init__(self, payment_client: httpx.AsyncClient) -> None:
self._client = payment_client
async def charge(
self,
conn: asyncpg.Connection,
order_id: int,
amount: Decimal,
idempotency_key: uuid.UUID,
) -> None:
response = await self._client.post(
"/payments/charge",
json={"order_id": order_id, "amount": str(amount)},
headers={"Idempotency-Key": str(idempotency_key)},
)
response.raise_for_status()
await conn.execute(
"UPDATE orders SET payment_status = 'charged' WHERE id = $1",
order_id,
)
Сценарий без Idempotency-Key: payment-provider принял charge, ответил 200, но connection reset — consumer не получил ответ, транзакция откатилась, processed_event пуст. Следующий poll → try_mark_processed=True → второй charge. Provider дебетует счёт дважды.
С Idempotency-Key = event_id provider дедуплицирует на своей стороне. Клиент Customer получает корректное списание независимо от числа повторов.
TTL processed_event
Таблица растёт линейно с потоком событий. TTL обязателен (R-KFK-IDEM-2).
Вариант 1 — range partitioning + DROP старой партиции:
CREATE TABLE processed_event (
event_id uuid NOT NULL,
consumer_group text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (event_id, consumer_group, processed_at)
) PARTITION BY RANGE (processed_at);
CREATE TABLE processed_event_2026_06
PARTITION OF processed_event
FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');
Партиции создаются заранее (через Liquibase или отдельную cron-задачу). DROP старой партиции мгновенный в отличие от DELETE миллионов строк.
Вариант 2 — фоновая задача через APScheduler:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import timedelta, datetime, timezone
async def cleanup_processed_events(pool: asyncpg.Pool) -> None:
cutoff = datetime.now(timezone.utc) - timedelta(days=7)
async with pool.acquire() as conn:
deleted = await conn.execute(
"DELETE FROM processed_event WHERE processed_at < $1",
cutoff,
)
scheduler = AsyncIOScheduler()
scheduler.add_job(
cleanup_processed_events,
"cron",
hour=3,
minute=0,
kwargs={"pool": pool},
)
Retention 7 дней достаточно для большинства случаев: дубликаты приходят в пределах часов, не дней. Для топиков с высоким потоком (>1M событий/день) — только partitioning.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Listener без проверки event_id для critical-топиков | R-KFK-IDEM-X1 | processed_event + try_mark_processed |
| Kafka offset как dedup-ключ | R-KFK-IDEM-X2 | event_id UUID v7 в payload |
Money без Idempotency-Key на HTTP | R-KFK-IDEM-4 | Idempotency-Key: event_id в headers |
processed_event без TTL | R-KFK-IDEM-2 | partitioning или фоновая задача |
try_mark_processed без ON CONFLICT DO NOTHING | R-KFK-IDEM-2 | INSERT ... ON CONFLICT DO NOTHING |
charge вне транзакции с try_mark_processed | R-KFK-IDEM-3 | одна conn.transaction() на обе операции |
enable_auto_commit=True | R-KFK-CONS-X1 | enable_auto_commit=False, await consumer.commit() после TX |
| Random UUID вместо UUID v7 | R-KFK-IDEM-1 | uuid.uuid7() (time-sortable, sequential insert) |
Куда дальше
- Конфигурация —
KafkaSettingsчерез pydantic-settings, bootstrap_servers из env. - Consumer —
AIOKafkaConsumer, manual commit,group_id,auto_offset_reset. - Event design —
event_idв payload,@dataclass(frozen=True), Pydantic на границе. - Observability — consumer lag alert, traceparent в headers.
- Outbox publishing —
event_idгенерируется на стороне producer в outbox. - Producer —
enable_idempotence=True, partition key = aggregate id. - Retry topic + DLQ — DLQ replay как источник дублей, max-attempts.
- Security — TLS, per-service ACL, PII в restricted-топиках.