Опирается на правила: R-KFK-RTRY-1R-KFK-RTRY-4 и R-KFK-RTRY-X1R-KFK-RTRY-X4 из Kafka Style Guide → раздел 5. Retry topic + DLQ.

Важно знать

  • Blocking retry через asyncio.sleep в listener — антипаттерн, держит poll-цикл, провоцирует rebalance.
  • Non-blocking retry — отдельные топики с возрастающим delay: *.retry-5s, *.retry-30s, *.retry-5m, *.dlq.
  • AIOKafkaProducer публикует в retry-topic и немедленно освобождает poll-цикл основного consumer.
  • Retry только для transient: сеть, 5xx downstream, DB timeout. Не retry: 4xx, validation, баги кода.
  • DLQ-monitoring — alert если за час > N сообщений (R-KFK-RTRY-3).
  • Replay из DLQ — ручная операция через admin-endpoint, не автоматическая.
  • Проглатывание exception (except Exception: await consumer.commit()) — событие потеряно безвозвратно.
  • Retry-топик без max-attempts = бесконечный lock-step с проблемной системой.

Aiokafka-consumer должен переживать временные сбои внешних систем без потери сообщений и без блокировки event-loop. Retry topic + DLQ — стандартный паттерн: каждая попытка публикуется в отдельный topic с увеличивающимся delay, после исчерпания попыток сообщение падает в DLQ для ручного разбора.

Структура retry-топиков

R-KFK-RTRY-1: отдельные топики, не блокирующий повтор.

orders.confirmed              ← основной
orders.confirmed.retry-5s     ← попытка 2: через 5 с
orders.confirmed.retry-30s    ← попытка 3: через 30 с
orders.confirmed.retry-5m     ← попытка 4: через 5 мин
orders.confirmed.dlq          ← окончательный fail

Топики создаются явно через admin CLI или terraform — не auto-create, иначе typo создаст топик с дефолтными настройками в проде.

Retry-обёртка для aiokafka

Spring Kafka предоставляет @RetryableTopic из коробки; в Python этого нет — пишем обёртку вокруг consumer-loop. Главный принцип: при transient-ошибке публикуем событие в retry-топик и немедленно коммитим offset основного топика. Poll-цикл продолжается.

from __future__ import annotations

import asyncio
import json
import logging
from dataclasses import dataclass
from typing import Callable, Awaitable

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer, ConsumerRecord

logger = logging.getLogger(__name__)

RETRY_SCHEDULE = [5_000, 30_000, 300_000]  # мс: 5s, 30s, 5min


@dataclass(frozen=True)
class RetryMeta:
    attempt: int
    original_topic: str


class RetryableError(Exception):
    pass


class NonRetryableError(Exception):
    pass


async def run_with_retry(
    consumer: AIOKafkaConsumer,
    producer: AIOKafkaProducer,
    handler: Callable[[dict], Awaitable[None]],
    dlq_topic: str,
) -> None:
    async for msg in consumer:
        await _dispatch(msg, consumer, producer, handler, dlq_topic)


async def _dispatch(
    msg: ConsumerRecord,
    consumer: AIOKafkaConsumer,
    producer: AIOKafkaProducer,
    handler: Callable[[dict], Awaitable[None]],
    dlq_topic: str,
) -> None:
    attempt = int(msg.headers.get(b"x-retry-attempt", b"0"))
    original_topic = (msg.headers.get(b"x-original-topic") or msg.topic.encode()).decode()
    payload = json.loads(msg.value)

    try:
        await handler(payload)
        await consumer.commit()
    except RetryableError as exc:
        if attempt < len(RETRY_SCHEDULE):
            delay_ms = RETRY_SCHEDULE[attempt]
            retry_topic = f"{original_topic}.retry-{_label(delay_ms)}"
            headers = [
                (b"x-retry-attempt", str(attempt + 1).encode()),
                (b"x-original-topic", original_topic.encode()),
                (b"x-error", str(exc).encode()),
            ]
            await producer.send_and_wait(
                retry_topic,
                value=msg.value,
                key=msg.key,
                headers=headers,
            )
            logger.warning("retry attempt=%d topic=%s order_id=%s", attempt + 1, retry_topic, msg.key)
        else:
            await producer.send_and_wait(dlq_topic, value=msg.value, key=msg.key)
            logger.error("dlq topic=%s order_id=%s error=%s", dlq_topic, msg.key, exc)
        await consumer.commit()
    except NonRetryableError as exc:
        await producer.send_and_wait(dlq_topic, value=msg.value, key=msg.key)
        logger.error("non-retryable dlq topic=%s order_id=%s error=%s", dlq_topic, msg.key, exc)
        await consumer.commit()


def _label(ms: int) -> str:
    if ms < 60_000:
        return f"{ms // 1000}s"
    return f"{ms // 60_000}m"

Что происходит при RetryableError:

  1. Listener выбрасывает RetryableError.
  2. _dispatch публикует событие в orders.confirmed.retry-5s с заголовками x-retry-attempt=1, x-original-topic.
  3. Коммитит offset основного топика — poll-цикл продолжается без блокировки.
  4. Отдельный consumer читает *.retry-5s; при повторной ошибке — *.retry-30s.
  5. После 4-й попытки — *.dlq, оператор видит алерт.

Consumer retry-топика с delayed processing

Retry-consumer читает свой топик. Delay реализуется через разницу между timestamp события и asyncio.sleep — не через блокировку poll-цикла самого listener.

import time

async def run_retry_consumer(
    consumer: AIOKafkaConsumer,
    producer: AIOKafkaProducer,
    handler: Callable[[dict], Awaitable[None]],
    delay_ms: int,
    dlq_topic: str,
) -> None:
    async for msg in consumer:
        elapsed_ms = int(time.time() * 1000) - msg.timestamp
        remaining_ms = delay_ms - elapsed_ms
        if remaining_ms > 0:
            await asyncio.sleep(remaining_ms / 1000)
        await _dispatch(msg, consumer, producer, handler, dlq_topic)

Consumer читает сообщение как только оно доступно, а ждёт оставшийся delay до обработки. Таким образом poll-цикл не блокирует брокер, а сам consumer справедливо распределяет нагрузку.

Использование: billing при OrderConfirmed

from pydantic import BaseModel
from uuid import UUID
from decimal import Decimal


class OrderConfirmedEvent(BaseModel):
    event_id: UUID
    order_id: UUID
    customer_id: UUID
    total_amount: Decimal


async def handle_order_confirmed(payload: dict) -> None:
    event = OrderConfirmedEvent(**payload)
    try:
        await billing_service.charge(event.order_id, event.customer_id, event.total_amount)
    except httpx.HTTPStatusError as exc:
        if exc.response.status_code >= 500:
            raise RetryableError(f"billing-provider 5xx: {exc.response.status_code}") from exc
        raise NonRetryableError(f"billing-provider 4xx: {exc.response.status_code}") from exc
    except asyncio.TimeoutError as exc:
        raise RetryableError("billing-provider timeout") from exc

Что retry-ить, что нет

R-KFK-RTRY-2: разные исключения — разные стратегии.

Тип ошибкиRetry?Причина
asyncio.TimeoutError, ConnectionErrorДасетевая проблема, временная
HTTP 5xx от downstreamДасервер перегружен или недоступен
asyncpg.TooManyConnectionsErrorДапул подключений перегружен
asyncpg.DeadlockDetectedErrorДаконкурентный write, retry с reload
HTTP 4xx от downstreamНетконтракт нарушен, retry не починит
pydantic.ValidationErrorНетданные невалидны, retry бессмыслен
TypeError, AttributeErrorНетbug в коде, retry не поможет
ValueError (бизнес-инвариант)Нетбизнес-логика отказала, не временно

Запуск consumer-задач

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from app.settings import KafkaSettings


async def start_billing_consumers(settings: KafkaSettings) -> None:
    producer = AIOKafkaProducer(
        bootstrap_servers=settings.brokers,
        enable_idempotence=True,
        value_serializer=lambda v: v,
    )
    await producer.start()

    consumers = [
        _make_consumer(settings, "orders.confirmed", "billing-order-confirmed"),
        _make_consumer(settings, "orders.confirmed.retry-5s", "billing-order-confirmed-retry-5s"),
        _make_consumer(settings, "orders.confirmed.retry-30s", "billing-order-confirmed-retry-30s"),
        _make_consumer(settings, "orders.confirmed.retry-5m", "billing-order-confirmed-retry-5m"),
    ]
    for c in consumers:
        await c.start()

    dlq = "orders.confirmed.dlq"

    await asyncio.gather(
        run_with_retry(consumers[0], producer, handle_order_confirmed, dlq),
        run_retry_consumer(consumers[1], producer, handle_order_confirmed, 5_000, dlq),
        run_retry_consumer(consumers[2], producer, handle_order_confirmed, 30_000, dlq),
        run_retry_consumer(consumers[3], producer, handle_order_confirmed, 300_000, dlq),
    )


def _make_consumer(settings: KafkaSettings, topic: str, group_id: str) -> AIOKafkaConsumer:
    return AIOKafkaConsumer(
        topic,
        bootstrap_servers=settings.brokers,
        group_id=group_id,
        enable_auto_commit=False,
        auto_offset_reset="earliest",
        value_deserializer=lambda b: b,
    )

DLQ monitoring

R-KFK-RTRY-3: alert на размер DLQ.

- alert: KafkaDlqBacklog
  expr: |
    kafka_topic_partition_current_offset{topic=~".*\\.dlq"}
    - kafka_consumer_group_current_offset{topic=~".*\\.dlq",group="ops-monitoring"} > 5
  for: 15m
  labels:
    severity: critical
  annotations:
    summary: "DLQ {{ $labels.topic }} > 5 необработанных событий"
    runbook: https://runbooks.internal/kafka-dlq

Threshold зависит от критичности: для orders.confirmed.dlq> 1 (одно событие = непроведённый платёж = инцидент); для аналитики — > 1000 (потеря части данных приемлема).

Без алерта DLQ превращается в свалку — события накапливаются месяцами, команда узнаёт о проблемах через жалобы.

Replay из DLQ

R-KFK-RTRY-4: ручная операция через admin-endpoint.

from fastapi import APIRouter, Depends
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

router = APIRouter(prefix="/admin/dlq", tags=["admin"])


@router.post("/{dlq_topic}/replay")
async def replay_dlq(
    dlq_topic: str,
    target_topic: str,
    producer: AIOKafkaProducer = Depends(get_producer),
    _: None = Depends(require_admin),
) -> dict:
    consumer = AIOKafkaConsumer(
        dlq_topic,
        bootstrap_servers=settings.brokers,
        group_id=f"dlq-replay-{dlq_topic}",
        enable_auto_commit=False,
        auto_offset_reset="earliest",
    )
    await consumer.start()
    replayed = 0
    try:
        async for msg in consumer:
            await producer.send_and_wait(target_topic, value=msg.value, key=msg.key)
            await consumer.commit()
            replayed += 1
    finally:
        await consumer.stop()
    return {"replayed": replayed, "target_topic": target_topic}

Почему не автоматический retry из DLQ:

  • В DLQ попало событие с багом в обработчике. Автоматический возврат → снова ошибка → снова DLQ. Цикл без выхода.
  • Может быть событие с corrupted payload — replay бессмыслен, нужен DELETE.
  • Восстановление инфры (payment-provider вернулся через 8 часов) — операционное решение, не код.

Оператор смотрит в DLQ через KafkaUI или admin-API, по каждому событию решает: replay, drop, или ручное восстановление.

Что запрещено

АнтипаттернПравилоЧто взамен
Blocking retry через asyncio.sleep в listenerR-KFK-RTRY-X1публикация в retry-topic + commit
except Exception: await consumer.commit()R-KFK-RTRY-X2throw + retry-topic либо DLQ
Retry-топик без ограничения попытокR-KFK-RTRY-X33–4 попытки через x-retry-attempt → DLQ
DLQ без алертаR-KFK-RTRY-X4size-alert обязателен
Retry на 4xx / ValidationErrorR-KFK-RTRY-2NonRetryableError → сразу DLQ
Авто-replay из DLQ по расписаниюR-KFK-RTRY-4ручной разбор через admin-endpoint
Auto-create retry-топиков в продеR-KFK-RTRY-1явное создание через admin CLI
Один retry-топик на все события сервисаR-KFK-RTRY-1per-listener retry-топики

Куда дальше

  • Consumer — почему blocking-операции ломают poll-цикл в aiokafka.
  • Idempotent consumer — replay из DLQ = дубль; как защититься.
  • Observability — consumer lag, DLQ size alerts через prometheus-client.
  • Event design — структура payload, event_id, forward-compatible schema.
  • Конфигурация — KafkaSettings через pydantic-settings, проверка топиков при старте.
  • Producer — идемпотентный producer, acks=all.
  • Outbox publishing — почему domain-события нельзя публиковать напрямую.
  • Security — TLS, ACL per-service.