Опирается на правила:
R-KFK-RTRY-1…R-KFK-RTRY-4иR-KFK-RTRY-X1…R-KFK-RTRY-X4из Kafka Style Guide → раздел 5. Retry topic + DLQ.
Важно знать
- Blocking retry через
asyncio.sleepв listener — антипаттерн, держит poll-цикл, провоцирует rebalance.- Non-blocking retry — отдельные топики с возрастающим delay:
*.retry-5s,*.retry-30s,*.retry-5m,*.dlq.AIOKafkaProducerпубликует в retry-topic и немедленно освобождает poll-цикл основного consumer.- Retry только для transient: сеть, 5xx downstream, DB timeout. Не retry: 4xx, validation, баги кода.
- DLQ-monitoring — alert если за час > N сообщений (
R-KFK-RTRY-3).- Replay из DLQ — ручная операция через admin-endpoint, не автоматическая.
- Проглатывание exception (
except Exception: await consumer.commit()) — событие потеряно безвозвратно.- Retry-топик без max-attempts = бесконечный lock-step с проблемной системой.
Aiokafka-consumer должен переживать временные сбои внешних систем без потери сообщений и без блокировки event-loop. Retry topic + DLQ — стандартный паттерн: каждая попытка публикуется в отдельный topic с увеличивающимся delay, после исчерпания попыток сообщение падает в DLQ для ручного разбора.
Структура retry-топиков
R-KFK-RTRY-1: отдельные топики, не блокирующий повтор.
orders.confirmed ← основной
orders.confirmed.retry-5s ← попытка 2: через 5 с
orders.confirmed.retry-30s ← попытка 3: через 30 с
orders.confirmed.retry-5m ← попытка 4: через 5 мин
orders.confirmed.dlq ← окончательный fail
Топики создаются явно через admin CLI или terraform — не auto-create, иначе typo создаст топик с дефолтными настройками в проде.
Retry-обёртка для aiokafka
Spring Kafka предоставляет @RetryableTopic из коробки; в Python этого нет — пишем обёртку вокруг consumer-loop. Главный принцип: при transient-ошибке публикуем событие в retry-топик и немедленно коммитим offset основного топика. Poll-цикл продолжается.
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass
from typing import Callable, Awaitable
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer, ConsumerRecord
logger = logging.getLogger(__name__)
RETRY_SCHEDULE = [5_000, 30_000, 300_000] # мс: 5s, 30s, 5min
@dataclass(frozen=True)
class RetryMeta:
attempt: int
original_topic: str
class RetryableError(Exception):
pass
class NonRetryableError(Exception):
pass
async def run_with_retry(
consumer: AIOKafkaConsumer,
producer: AIOKafkaProducer,
handler: Callable[[dict], Awaitable[None]],
dlq_topic: str,
) -> None:
async for msg in consumer:
await _dispatch(msg, consumer, producer, handler, dlq_topic)
async def _dispatch(
msg: ConsumerRecord,
consumer: AIOKafkaConsumer,
producer: AIOKafkaProducer,
handler: Callable[[dict], Awaitable[None]],
dlq_topic: str,
) -> None:
attempt = int(msg.headers.get(b"x-retry-attempt", b"0"))
original_topic = (msg.headers.get(b"x-original-topic") or msg.topic.encode()).decode()
payload = json.loads(msg.value)
try:
await handler(payload)
await consumer.commit()
except RetryableError as exc:
if attempt < len(RETRY_SCHEDULE):
delay_ms = RETRY_SCHEDULE[attempt]
retry_topic = f"{original_topic}.retry-{_label(delay_ms)}"
headers = [
(b"x-retry-attempt", str(attempt + 1).encode()),
(b"x-original-topic", original_topic.encode()),
(b"x-error", str(exc).encode()),
]
await producer.send_and_wait(
retry_topic,
value=msg.value,
key=msg.key,
headers=headers,
)
logger.warning("retry attempt=%d topic=%s order_id=%s", attempt + 1, retry_topic, msg.key)
else:
await producer.send_and_wait(dlq_topic, value=msg.value, key=msg.key)
logger.error("dlq topic=%s order_id=%s error=%s", dlq_topic, msg.key, exc)
await consumer.commit()
except NonRetryableError as exc:
await producer.send_and_wait(dlq_topic, value=msg.value, key=msg.key)
logger.error("non-retryable dlq topic=%s order_id=%s error=%s", dlq_topic, msg.key, exc)
await consumer.commit()
def _label(ms: int) -> str:
if ms < 60_000:
return f"{ms // 1000}s"
return f"{ms // 60_000}m"
Что происходит при RetryableError:
- Listener выбрасывает
RetryableError. _dispatchпубликует событие вorders.confirmed.retry-5sс заголовкамиx-retry-attempt=1,x-original-topic.- Коммитит offset основного топика — poll-цикл продолжается без блокировки.
- Отдельный consumer читает
*.retry-5s; при повторной ошибке —*.retry-30s. - После 4-й попытки —
*.dlq, оператор видит алерт.
Consumer retry-топика с delayed processing
Retry-consumer читает свой топик. Delay реализуется через разницу между timestamp события и asyncio.sleep — не через блокировку poll-цикла самого listener.
import time
async def run_retry_consumer(
consumer: AIOKafkaConsumer,
producer: AIOKafkaProducer,
handler: Callable[[dict], Awaitable[None]],
delay_ms: int,
dlq_topic: str,
) -> None:
async for msg in consumer:
elapsed_ms = int(time.time() * 1000) - msg.timestamp
remaining_ms = delay_ms - elapsed_ms
if remaining_ms > 0:
await asyncio.sleep(remaining_ms / 1000)
await _dispatch(msg, consumer, producer, handler, dlq_topic)
Consumer читает сообщение как только оно доступно, а ждёт оставшийся delay до обработки. Таким образом poll-цикл не блокирует брокер, а сам consumer справедливо распределяет нагрузку.
Использование: billing при OrderConfirmed
from pydantic import BaseModel
from uuid import UUID
from decimal import Decimal
class OrderConfirmedEvent(BaseModel):
event_id: UUID
order_id: UUID
customer_id: UUID
total_amount: Decimal
async def handle_order_confirmed(payload: dict) -> None:
event = OrderConfirmedEvent(**payload)
try:
await billing_service.charge(event.order_id, event.customer_id, event.total_amount)
except httpx.HTTPStatusError as exc:
if exc.response.status_code >= 500:
raise RetryableError(f"billing-provider 5xx: {exc.response.status_code}") from exc
raise NonRetryableError(f"billing-provider 4xx: {exc.response.status_code}") from exc
except asyncio.TimeoutError as exc:
raise RetryableError("billing-provider timeout") from exc
Что retry-ить, что нет
R-KFK-RTRY-2: разные исключения — разные стратегии.
| Тип ошибки | Retry? | Причина |
|---|---|---|
asyncio.TimeoutError, ConnectionError | Да | сетевая проблема, временная |
| HTTP 5xx от downstream | Да | сервер перегружен или недоступен |
asyncpg.TooManyConnectionsError | Да | пул подключений перегружен |
asyncpg.DeadlockDetectedError | Да | конкурентный write, retry с reload |
| HTTP 4xx от downstream | Нет | контракт нарушен, retry не починит |
pydantic.ValidationError | Нет | данные невалидны, retry бессмыслен |
TypeError, AttributeError | Нет | bug в коде, retry не поможет |
ValueError (бизнес-инвариант) | Нет | бизнес-логика отказала, не временно |
Запуск consumer-задач
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from app.settings import KafkaSettings
async def start_billing_consumers(settings: KafkaSettings) -> None:
producer = AIOKafkaProducer(
bootstrap_servers=settings.brokers,
enable_idempotence=True,
value_serializer=lambda v: v,
)
await producer.start()
consumers = [
_make_consumer(settings, "orders.confirmed", "billing-order-confirmed"),
_make_consumer(settings, "orders.confirmed.retry-5s", "billing-order-confirmed-retry-5s"),
_make_consumer(settings, "orders.confirmed.retry-30s", "billing-order-confirmed-retry-30s"),
_make_consumer(settings, "orders.confirmed.retry-5m", "billing-order-confirmed-retry-5m"),
]
for c in consumers:
await c.start()
dlq = "orders.confirmed.dlq"
await asyncio.gather(
run_with_retry(consumers[0], producer, handle_order_confirmed, dlq),
run_retry_consumer(consumers[1], producer, handle_order_confirmed, 5_000, dlq),
run_retry_consumer(consumers[2], producer, handle_order_confirmed, 30_000, dlq),
run_retry_consumer(consumers[3], producer, handle_order_confirmed, 300_000, dlq),
)
def _make_consumer(settings: KafkaSettings, topic: str, group_id: str) -> AIOKafkaConsumer:
return AIOKafkaConsumer(
topic,
bootstrap_servers=settings.brokers,
group_id=group_id,
enable_auto_commit=False,
auto_offset_reset="earliest",
value_deserializer=lambda b: b,
)
DLQ monitoring
R-KFK-RTRY-3: alert на размер DLQ.
- alert: KafkaDlqBacklog
expr: |
kafka_topic_partition_current_offset{topic=~".*\\.dlq"}
- kafka_consumer_group_current_offset{topic=~".*\\.dlq",group="ops-monitoring"} > 5
for: 15m
labels:
severity: critical
annotations:
summary: "DLQ {{ $labels.topic }} > 5 необработанных событий"
runbook: https://runbooks.internal/kafka-dlq
Threshold зависит от критичности: для orders.confirmed.dlq — > 1 (одно событие = непроведённый платёж = инцидент); для аналитики — > 1000 (потеря части данных приемлема).
Без алерта DLQ превращается в свалку — события накапливаются месяцами, команда узнаёт о проблемах через жалобы.
Replay из DLQ
R-KFK-RTRY-4: ручная операция через admin-endpoint.
from fastapi import APIRouter, Depends
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
router = APIRouter(prefix="/admin/dlq", tags=["admin"])
@router.post("/{dlq_topic}/replay")
async def replay_dlq(
dlq_topic: str,
target_topic: str,
producer: AIOKafkaProducer = Depends(get_producer),
_: None = Depends(require_admin),
) -> dict:
consumer = AIOKafkaConsumer(
dlq_topic,
bootstrap_servers=settings.brokers,
group_id=f"dlq-replay-{dlq_topic}",
enable_auto_commit=False,
auto_offset_reset="earliest",
)
await consumer.start()
replayed = 0
try:
async for msg in consumer:
await producer.send_and_wait(target_topic, value=msg.value, key=msg.key)
await consumer.commit()
replayed += 1
finally:
await consumer.stop()
return {"replayed": replayed, "target_topic": target_topic}
Почему не автоматический retry из DLQ:
- В DLQ попало событие с багом в обработчике. Автоматический возврат → снова ошибка → снова DLQ. Цикл без выхода.
- Может быть событие с corrupted payload — replay бессмыслен, нужен
DELETE. - Восстановление инфры (payment-provider вернулся через 8 часов) — операционное решение, не код.
Оператор смотрит в DLQ через KafkaUI или admin-API, по каждому событию решает: replay, drop, или ручное восстановление.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Blocking retry через asyncio.sleep в listener | R-KFK-RTRY-X1 | публикация в retry-topic + commit |
except Exception: await consumer.commit() | R-KFK-RTRY-X2 | throw + retry-topic либо DLQ |
| Retry-топик без ограничения попыток | R-KFK-RTRY-X3 | 3–4 попытки через x-retry-attempt → DLQ |
| DLQ без алерта | R-KFK-RTRY-X4 | size-alert обязателен |
Retry на 4xx / ValidationError | R-KFK-RTRY-2 | NonRetryableError → сразу DLQ |
| Авто-replay из DLQ по расписанию | R-KFK-RTRY-4 | ручной разбор через admin-endpoint |
| Auto-create retry-топиков в проде | R-KFK-RTRY-1 | явное создание через admin CLI |
| Один retry-топик на все события сервиса | R-KFK-RTRY-1 | per-listener retry-топики |
Куда дальше
- Consumer — почему blocking-операции ломают poll-цикл в aiokafka.
- Idempotent consumer — replay из DLQ = дубль; как защититься.
- Observability — consumer lag, DLQ size alerts через prometheus-client.
- Event design — структура payload,
event_id, forward-compatible schema. - Конфигурация —
KafkaSettingsчерез pydantic-settings, проверка топиков при старте. - Producer — идемпотентный producer, acks=all.
- Outbox publishing — почему domain-события нельзя публиковать напрямую.
- Security — TLS, ACL per-service.