Опирается на правила: R-KFK-IDEM-1R-KFK-IDEM-4 и R-KFK-IDEM-X1R-KFK-IDEM-X2 из Kafka Style Guide → раздел 4. Idempotent consumer.

Важно знать

  • Kafka — at-least-once: дубликаты норма (rebalance, DLQ replay, offset reset), consumer обязан с ними справляться.
  • Уникальный event_id (UUID v7) в payload — единственный надёжный dedup-ключ.
  • processed_event таблица с PRIMARY KEY на event_id — UNIQUE constraint защищает даже под race conditions.
  • Запись в processed_event и бизнес-результат — в одной транзакции.
  • Manual commit (enable_auto_commit=False) + await consumer.commit() только после успешной обработки.
  • Money — двойная защита: event_id + Idempotency-Key на downstream HTTP.
  • TTL processed_event обязателен: partitioning + drop partition или фоновая задача.
  • Kafka offset как dedup — нельзя: offset зависит от consumer-group, не от события.

Любой aiokafka consumer обязан быть idempotent. Producer с enable_idempotence=True устраняет дубликаты на уровне producer-partition, но дубликаты возможны при rebalance до commit, при replay из DLQ, при auto_offset_reset="earliest". Принять «обычно срабатывает один раз» — значит ждать инцидент.

Уникальный event_id в payload

R-KFK-IDEM-1: каждое событие имеет UUID v7 идентификатор в payload.

import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from decimal import Decimal


@dataclass(frozen=True)
class OrderConfirmedEvent:
    event_id: uuid.UUID
    event_type: str
    occurred_at: datetime
    order_id: int
    customer_id: int
    total_amount: Decimal

    @classmethod
    def from_order(cls, order: "Order") -> "OrderConfirmedEvent":
        return cls(
            event_id=uuid.uuid7(),
            event_type="order.confirmed.v1",
            occurred_at=order.confirmed_at,
            order_id=order.id,
            customer_id=order.customer_id,
            total_amount=order.total_amount,
        )

UUID v7 — time-sortable (первые 48 бит — timestamp). Это даёт sequential insert в processed_event PK B-tree с низкой фрагментацией и позволяет сортировать события по времени без отдельного occurred_at индекса.

На границе consumer — Pydantic-модель (R-KFK-EVT-4):

from pydantic import BaseModel, UUID7
from datetime import datetime
from decimal import Decimal


class OrderConfirmedMessage(BaseModel):
    event_id: UUID7
    event_type: str
    occurred_at: datetime
    order_id: int
    customer_id: int
    total_amount: Decimal

processed_event таблица

R-KFK-IDEM-2: DDL с PRIMARY KEY на event_id.

CREATE TABLE processed_event (
    event_id       uuid        NOT NULL,
    consumer_group text        NOT NULL,
    processed_at   timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY    (event_id, consumer_group)
);

CREATE INDEX ix_processed_event_processed_at ON processed_event(processed_at);

PRIMARY KEY на (event_id, consumer_group) — когда несколько сервисов читают один топик (billing-order-confirmed и analytics-order-confirmed), каждый ведёт свой dedup независимо. Если топик читает ровно один сервис — достаточно PRIMARY KEY на event_id.

INSERT ... ON CONFLICT DO NOTHING — атомарный dedup на уровне БД. Даже если два worker-процесса получат дубль одновременно — один INSERT пройдёт, второй вернёт 0 строк.

async def try_mark_processed(
    conn: asyncpg.Connection,
    event_id: uuid.UUID,
    consumer_group: str,
) -> bool:
    result = await conn.execute(
        """
        INSERT INTO processed_event (event_id, consumer_group)
        VALUES ($1, $2)
        ON CONFLICT DO NOTHING
        """,
        event_id,
        consumer_group,
    )
    return result == "INSERT 0 1"

Dedup в consumer-цикле

R-KFK-IDEM-3: проверка processed_event и бизнес-логика — в одной транзакции через asyncpg.

from aiokafka import AIOKafkaConsumer
from pydantic import ValidationError
import asyncpg
import json

CONSUMER_GROUP = "billing-order-confirmed"


async def run_consumer(
    pool: asyncpg.Pool,
    billing_service: "BillingService",
) -> None:
    consumer = AIOKafkaConsumer(
        "order-service.order.confirmed",
        bootstrap_servers=settings.kafka_brokers,
        group_id=CONSUMER_GROUP,
        enable_auto_commit=False,
        auto_offset_reset="earliest",
        value_deserializer=lambda b: json.loads(b),
    )
    await consumer.start()
    try:
        async for msg in consumer:
            await _handle(msg, pool, billing_service, consumer)
    finally:
        await consumer.stop()


async def _handle(
    msg,
    pool: asyncpg.Pool,
    billing_service: "BillingService",
    consumer: AIOKafkaConsumer,
) -> None:
    try:
        event = OrderConfirmedMessage.model_validate(msg.value)
    except ValidationError:
        await consumer.commit()
        return

    async with pool.acquire() as conn:
        async with conn.transaction():
            already = not await try_mark_processed(conn, event.event_id, CONSUMER_GROUP)
            if already:
                pass
            else:
                await billing_service.charge(conn, event.order_id, event.total_amount)

    await consumer.commit()

Транзакция оборачивает try_mark_processed и billing_service.charge. Если charge бросает исключение — processed_event тоже откатывается, следующий poll обработает событие заново.

Если процесс упал после успешного commit транзакции, но до await consumer.commit() — следующий poll вернёт то же сообщение. try_mark_processed вернёт False (запись уже есть), обработка пропустится, offset зафиксируется. Идемпотентность работает.

Money — двойная защита

R-KFK-IDEM-4: для денежных операций — event_id + Idempotency-Key на downstream HTTP.

import httpx


class BillingService:
    def __init__(self, payment_client: httpx.AsyncClient) -> None:
        self._client = payment_client

    async def charge(
        self,
        conn: asyncpg.Connection,
        order_id: int,
        amount: Decimal,
        idempotency_key: uuid.UUID,
    ) -> None:
        response = await self._client.post(
            "/payments/charge",
            json={"order_id": order_id, "amount": str(amount)},
            headers={"Idempotency-Key": str(idempotency_key)},
        )
        response.raise_for_status()
        await conn.execute(
            "UPDATE orders SET payment_status = 'charged' WHERE id = $1",
            order_id,
        )

Сценарий без Idempotency-Key: payment-provider принял charge, ответил 200, но connection reset — consumer не получил ответ, транзакция откатилась, processed_event пуст. Следующий poll → try_mark_processed=True → второй charge. Provider дебетует счёт дважды.

С Idempotency-Key = event_id provider дедуплицирует на своей стороне. Клиент Customer получает корректное списание независимо от числа повторов.

TTL processed_event

Таблица растёт линейно с потоком событий. TTL обязателен (R-KFK-IDEM-2).

Вариант 1 — range partitioning + DROP старой партиции:

CREATE TABLE processed_event (
    event_id       uuid        NOT NULL,
    consumer_group text        NOT NULL,
    processed_at   timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY    (event_id, consumer_group, processed_at)
) PARTITION BY RANGE (processed_at);

CREATE TABLE processed_event_2026_06
    PARTITION OF processed_event
    FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');

Партиции создаются заранее (через Liquibase или отдельную cron-задачу). DROP старой партиции мгновенный в отличие от DELETE миллионов строк.

Вариант 2 — фоновая задача через APScheduler:

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import timedelta, datetime, timezone


async def cleanup_processed_events(pool: asyncpg.Pool) -> None:
    cutoff = datetime.now(timezone.utc) - timedelta(days=7)
    async with pool.acquire() as conn:
        deleted = await conn.execute(
            "DELETE FROM processed_event WHERE processed_at < $1",
            cutoff,
        )


scheduler = AsyncIOScheduler()
scheduler.add_job(
    cleanup_processed_events,
    "cron",
    hour=3,
    minute=0,
    kwargs={"pool": pool},
)

Retention 7 дней достаточно для большинства случаев: дубликаты приходят в пределах часов, не дней. Для топиков с высоким потоком (>1M событий/день) — только partitioning.

Что запрещено

АнтипаттернПравилоЧто взамен
Listener без проверки event_id для critical-топиковR-KFK-IDEM-X1processed_event + try_mark_processed
Kafka offset как dedup-ключR-KFK-IDEM-X2event_id UUID v7 в payload
Money без Idempotency-Key на HTTPR-KFK-IDEM-4Idempotency-Key: event_id в headers
processed_event без TTLR-KFK-IDEM-2partitioning или фоновая задача
try_mark_processed без ON CONFLICT DO NOTHINGR-KFK-IDEM-2INSERT ... ON CONFLICT DO NOTHING
charge вне транзакции с try_mark_processedR-KFK-IDEM-3одна conn.transaction() на обе операции
enable_auto_commit=TrueR-KFK-CONS-X1enable_auto_commit=False, await consumer.commit() после TX
Random UUID вместо UUID v7R-KFK-IDEM-1uuid.uuid7() (time-sortable, sequential insert)

Куда дальше

  • Конфигурация — KafkaSettings через pydantic-settings, bootstrap_servers из env.
  • Consumer — AIOKafkaConsumer, manual commit, group_id, auto_offset_reset.
  • Event design — event_id в payload, @dataclass(frozen=True), Pydantic на границе.
  • Observability — consumer lag alert, traceparent в headers.
  • Outbox publishing — event_id генерируется на стороне producer в outbox.
  • Producer — enable_idempotence=True, partition key = aggregate id.
  • Retry topic + DLQ — DLQ replay как источник дублей, max-attempts.
  • Security — TLS, per-service ACL, PII в restricted-топиках.