Опирается на правила:
R-KFK-OBS-1…R-KFK-OBS-4иR-KFK-OBS-X1из Kafka Style Guide → раздел 8. Observability.
Важно знать
- aiokafka не экспортирует метрики автоматически — нужно собирать вручную через
prometheus-clientили OTel SDK.- Consumer lag — главный health-сигнал: растущий lag = consumer не справляется или упал.
- Alert на lag > N для критичных topic'ов в течение 5 минут — инцидент.
traceparentкладётся в Kafka headers вручную на producer, на consumer — извлекается и продолжает trace.- OTel instrumentation для aiokafka настраивается через
opentelemetry-instrumentation-aiokafka.- DLQ-size alert обязателен — каждое сообщение в DLQ требует разбора.
- Без lag alert провал pipeline замечают через жалобы пользователей.
Kafka pipeline без observability — чёрный ящик: producer публикует, consumer обрабатывает, что происходит между ними — неизвестно. В Python стек (aiokafka) метрики не появляются сами по себе: их нужно явно собрать и выставить. Consumer lag и DLQ-size — два маркера по которым ops понимает «жив ли pipeline».
Метрики через prometheus-client
R-KFK-OBS-1 — метрики producer/consumer через prometheus-client.
aiokafka предоставляет внутренние метрики через AIOKafkaProducer.metrics() и AIOKafkaConsumer.metrics() — словарь aiokafka-нейтральных измерений. Их нужно вычитывать и публиковать самостоятельно.
from prometheus_client import Gauge, Counter, Histogram
import asyncio
# Gauges для lag
consumer_lag = Gauge(
"kafka_consumer_lag",
"Consumer lag per partition",
["topic", "partition", "consumer_group"],
)
consumer_records_consumed = Counter(
"kafka_consumer_records_consumed_total",
"Total records consumed",
["topic"],
)
producer_records_sent = Counter(
"kafka_producer_records_sent_total",
"Total records sent",
["topic"],
)
producer_record_errors = Counter(
"kafka_producer_record_errors_total",
"Total send errors",
["topic"],
)
listener_processing_seconds = Histogram(
"kafka_listener_processing_seconds",
"Time spent processing one message",
["topic"],
)
Lag читается из consumer.end_offsets против consumer.position:
async def collect_lag_metrics(
consumer: AIOKafkaConsumer,
group_id: str,
) -> None:
partitions = consumer.assignment()
if not partitions:
return
end_offsets = await consumer.end_offsets(list(partitions))
for tp, end_offset in end_offsets.items():
position = await consumer.position(tp)
lag = max(0, end_offset - position)
consumer_lag.labels(
topic=tp.topic,
partition=str(tp.partition),
consumer_group=group_id,
).set(lag)
Запуск в фоновой задаче рядом с consumer-циклом:
async def lag_scraper(consumer: AIOKafkaConsumer, group_id: str) -> None:
while True:
await collect_lag_metrics(consumer, group_id)
await asyncio.sleep(15)
Метрики выставляются через стандартный /metrics endpoint — ASGI-совместимый prometheus_client.make_asgi_app() монтируется в FastAPI:
from prometheus_client import make_asgi_app
from fastapi import FastAPI
app = FastAPI()
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
Consumer lag — главный сигнал
R-KFK-OBS-2 — alert на lag для критичных topic'ов.
Пример Prometheus rule для orders.confirmed:
groups:
- name: kafka.rules
rules:
- alert: KafkaConsumerLagHigh
expr: |
max by (topic, consumer_group) (kafka_consumer_lag) > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer {{ $labels.consumer_group }} лагает на {{ $labels.topic }}"
runbook: https://runbooks.internal/kafka-consumer-lag
- alert: KafkaConsumerLagCritical
expr: |
max by (topic, consumer_group) (
kafka_consumer_lag{topic=~"orders.*|payments.*"}
) > 100
for: 5m
labels:
severity: critical
annotations:
summary: "Критичный lag на money-топике {{ $labels.topic }}"
runbook: https://runbooks.internal/kafka-consumer-lag-critical
Threshold зависит от критичности:
- Payment-события —
> 100: SMS/Telegram-алерт, немедленно. - Order-события —
> 1000: ticket, разбираемся в течение часа. - Аналитика (
product_viewed,search_clicked) —> 100000: low-priority.
Lag измеряется per (consumer_group, topic, partition). Высокий lag на одном partition — признак hot key: все OrderConfirmed одного customer_id идут на одну partition из-за keying по aggregate id.
Причины растущего lag:
- Consumer упал (CrashLoop, OOM) — смотреть pod logs.
- Обработка одного сообщения слишком долгая — внешняя система тормозит, нет circuit breaker.
max_poll_interval_msпревышается → постоянный rebalance → lag растёт.- Concurrency меньше числа partition'ов.
Инструментирование consumer-цикла
R-KFK-OBS-1 — замер времени обработки и счётчик consumed:
import time
from aiokafka import AIOKafkaConsumer
from app.domain.order.event import OrderConfirmed
from app.application.order.handler import OrderConfirmedHandler
async def run_consumer(settings: KafkaSettings) -> None:
consumer = AIOKafkaConsumer(
"orders.confirmed",
bootstrap_servers=settings.brokers,
group_id="billing-order-confirmed",
enable_auto_commit=False,
auto_offset_reset="earliest",
value_deserializer=lambda b: json.loads(b),
)
await consumer.start()
asyncio.create_task(lag_scraper(consumer, "billing-order-confirmed"))
handler = OrderConfirmedHandler()
async for msg in consumer:
start = time.perf_counter()
event = OrderConfirmed(**msg.value)
await handler.handle(event)
await consumer.commit()
elapsed = time.perf_counter() - start
listener_processing_seconds.labels(topic="orders.confirmed").observe(elapsed)
consumer_records_consumed.labels(topic="orders.confirmed").inc()
Traceparent в Kafka headers
R-KFK-OBS-3 — distributed trace продолжается через Kafka.
OTel instrumentation (рекомендованный путь)
opentelemetry-instrumentation-aiokafka автоматически инжектирует traceparent в headers при send_and_wait и извлекает при получении сообщения:
pip install opentelemetry-instrumentation-aiokafka opentelemetry-sdk opentelemetry-exporter-otlp
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
AIOKafkaInstrumentor().instrument()
После этого ничего custom-кода не нужно — любой AIOKafkaProducer.send_and_wait(...) автоматически кладёт traceparent в headers, AIOKafkaConsumer — извлекает и продолжает span.
Ручной propagation (fallback)
Если instrumentation недоступна или нужен контроль:
from opentelemetry import trace, propagate
from opentelemetry.propagators.textmap import DefaultSetter
class KafkaHeaderSetter(DefaultSetter):
def set(self, carrier: dict, key: str, value: str) -> None:
carrier[key] = value.encode()
def inject_trace_headers(headers: dict) -> dict:
propagate.inject(headers, setter=KafkaHeaderSetter())
return headers
# producer side — outbox-relay
async def publish_event(
producer: AIOKafkaProducer,
event: OutboxEvent,
) -> None:
headers = inject_trace_headers({})
await producer.send_and_wait(
topic=event.topic,
key=str(event.aggregate_id).encode(),
value=json.dumps(event.payload).encode(),
headers=list(headers.items()),
)
from opentelemetry.propagators.textmap import DefaultGetter
class KafkaHeaderGetter(DefaultGetter):
def get(self, carrier, key):
value = carrier.get(key)
if value is None:
return []
return [value.decode() if isinstance(value, bytes) else value]
def keys(self, carrier):
return list(carrier.keys())
# consumer side
async def process_message(msg) -> None:
headers_dict = dict(msg.headers)
ctx = propagate.extract(headers_dict, getter=KafkaHeaderGetter())
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("kafka.consume", context=ctx) as span:
span.set_attribute("messaging.kafka.topic", msg.topic)
span.set_attribute("messaging.kafka.partition", msg.partition)
event = OrderConfirmed(**json.loads(msg.value))
await handle_order_confirmed(event)
В Tempo/Jaeger trace выглядит как цепочка:
HTTP POST /orders/{id}/confirm
└─ handler: OrderConfirmUseCase
└─ outbox INSERT (PostgreSQL)
└─ outbox-relay: kafka.produce → orders.confirmed
└─ billing-service: kafka.consume
└─ billing handler: ChargeCustomer
└─ payment-gateway HTTP call
DLQ size alert
R-KFK-OBS-4 — алерт на размер DLQ.
DLQ-топики (orders.confirmed.dlq, payments.processed.dlq) нуждаются в отдельном мониторинге. Через Kafka JMX или kminion:
- alert: KafkaDLQNotEmpty
expr: |
kafka_topic_partitions_messages_in_total{topic=~".*\\.dlq"} > 0
for: 1m
labels:
severity: warning
annotations:
summary: "Сообщения в DLQ {{ $labels.topic }}"
runbook: https://runbooks.internal/kafka-dlq
Альтернатива — consumer-based мониторинг: отдельный consumer читает DLQ и инкрементирует счётчик:
dlq_messages_total = Counter(
"kafka_dlq_messages_total",
"Messages landed in DLQ",
["topic"],
)
async def monitor_dlq(settings: KafkaSettings) -> None:
consumer = AIOKafkaConsumer(
"orders.confirmed.dlq",
bootstrap_servers=settings.brokers,
group_id="dlq-monitor",
enable_auto_commit=False,
auto_offset_reset="earliest",
)
await consumer.start()
async for msg in consumer:
dlq_messages_total.labels(topic=msg.topic).inc()
await consumer.commit()
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Нет alert на kafka_consumer_lag | R-KFK-OBS-X1 | alert per критичность с runbook |
Lag alert без for: 5m | R-KFK-OBS-2 | for: 5m — избегаем flap на коротком spike |
| Один universal threshold для всех topic'ов | R-KFK-OBS-2 | money ≤ 100, orders ≤ 1000, analytics ≤ 100000 |
AIOKafkaInstrumentor не вызван | R-KFK-OBS-3 | AIOKafkaInstrumentor().instrument() в app startup |
| Нет DLQ size alert | R-KFK-OBS-4 | alert per DLQ-топик |
| Producer errors не алертятся | R-KFK-OBS-1 | kafka_producer_record_errors_total > 0 |
Метрики без service/env labels | R-KFK-OBS-1 | добавить через prometheus_client default labels |
| Lag scraper не запущен рядом с consumer | R-KFK-OBS-1 | asyncio.create_task(lag_scraper(...)) в startup |
Куда дальше
- Конфигурация —
KafkaSettingsчерезpydantic-settings,bootstrap_serversиз env. - Consumer — manual commit,
max_poll_interval_ms, rebalance. - Retry topic + DLQ — retry-топики, DLQ-routing, max-attempts.
- Producer — идемпотентный producer, partition key.
- Outbox publishing — outbox-relay,
FOR UPDATE SKIP LOCKED. - Idempotent consumer —
processed_event, dedup под race. - Event design —
@dataclass(frozen=True), forward-compat schema. - Security — TLS, SASL_SSL, per-service ACL.