Опирается на правила: R-KFK-OBS-1R-KFK-OBS-4 и R-KFK-OBS-X1 из Kafka Style Guide → раздел 8. Observability.

Важно знать

  • aiokafka не экспортирует метрики автоматически — нужно собирать вручную через prometheus-client или OTel SDK.
  • Consumer lag — главный health-сигнал: растущий lag = consumer не справляется или упал.
  • Alert на lag > N для критичных topic'ов в течение 5 минут — инцидент.
  • traceparent кладётся в Kafka headers вручную на producer, на consumer — извлекается и продолжает trace.
  • OTel instrumentation для aiokafka настраивается через opentelemetry-instrumentation-aiokafka.
  • DLQ-size alert обязателен — каждое сообщение в DLQ требует разбора.
  • Без lag alert провал pipeline замечают через жалобы пользователей.

Kafka pipeline без observability — чёрный ящик: producer публикует, consumer обрабатывает, что происходит между ними — неизвестно. В Python стек (aiokafka) метрики не появляются сами по себе: их нужно явно собрать и выставить. Consumer lag и DLQ-size — два маркера по которым ops понимает «жив ли pipeline».

Метрики через prometheus-client

R-KFK-OBS-1 — метрики producer/consumer через prometheus-client.

aiokafka предоставляет внутренние метрики через AIOKafkaProducer.metrics() и AIOKafkaConsumer.metrics() — словарь aiokafka-нейтральных измерений. Их нужно вычитывать и публиковать самостоятельно.

from prometheus_client import Gauge, Counter, Histogram
import asyncio

# Gauges для lag
consumer_lag = Gauge(
    "kafka_consumer_lag",
    "Consumer lag per partition",
    ["topic", "partition", "consumer_group"],
)
consumer_records_consumed = Counter(
    "kafka_consumer_records_consumed_total",
    "Total records consumed",
    ["topic"],
)
producer_records_sent = Counter(
    "kafka_producer_records_sent_total",
    "Total records sent",
    ["topic"],
)
producer_record_errors = Counter(
    "kafka_producer_record_errors_total",
    "Total send errors",
    ["topic"],
)
listener_processing_seconds = Histogram(
    "kafka_listener_processing_seconds",
    "Time spent processing one message",
    ["topic"],
)

Lag читается из consumer.end_offsets против consumer.position:

async def collect_lag_metrics(
    consumer: AIOKafkaConsumer,
    group_id: str,
) -> None:
    partitions = consumer.assignment()
    if not partitions:
        return
    end_offsets = await consumer.end_offsets(list(partitions))
    for tp, end_offset in end_offsets.items():
        position = await consumer.position(tp)
        lag = max(0, end_offset - position)
        consumer_lag.labels(
            topic=tp.topic,
            partition=str(tp.partition),
            consumer_group=group_id,
        ).set(lag)

Запуск в фоновой задаче рядом с consumer-циклом:

async def lag_scraper(consumer: AIOKafkaConsumer, group_id: str) -> None:
    while True:
        await collect_lag_metrics(consumer, group_id)
        await asyncio.sleep(15)

Метрики выставляются через стандартный /metrics endpoint — ASGI-совместимый prometheus_client.make_asgi_app() монтируется в FastAPI:

from prometheus_client import make_asgi_app
from fastapi import FastAPI

app = FastAPI()
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)

Consumer lag — главный сигнал

R-KFK-OBS-2 — alert на lag для критичных topic'ов.

Пример Prometheus rule для orders.confirmed:

groups:
  - name: kafka.rules
    rules:
      - alert: KafkaConsumerLagHigh
        expr: |
          max by (topic, consumer_group) (kafka_consumer_lag) > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Consumer {{ $labels.consumer_group }} лагает на {{ $labels.topic }}"
          runbook: https://runbooks.internal/kafka-consumer-lag

      - alert: KafkaConsumerLagCritical
        expr: |
          max by (topic, consumer_group) (
            kafka_consumer_lag{topic=~"orders.*|payments.*"}
          ) > 100
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Критичный lag на money-топике {{ $labels.topic }}"
          runbook: https://runbooks.internal/kafka-consumer-lag-critical

Threshold зависит от критичности:

  • Payment-события> 100: SMS/Telegram-алерт, немедленно.
  • Order-события> 1000: ticket, разбираемся в течение часа.
  • Аналитика (product_viewed, search_clicked) — > 100000: low-priority.

Lag измеряется per (consumer_group, topic, partition). Высокий lag на одном partition — признак hot key: все OrderConfirmed одного customer_id идут на одну partition из-за keying по aggregate id.

Причины растущего lag:

  • Consumer упал (CrashLoop, OOM) — смотреть pod logs.
  • Обработка одного сообщения слишком долгая — внешняя система тормозит, нет circuit breaker.
  • max_poll_interval_ms превышается → постоянный rebalance → lag растёт.
  • Concurrency меньше числа partition'ов.

Инструментирование consumer-цикла

R-KFK-OBS-1 — замер времени обработки и счётчик consumed:

import time
from aiokafka import AIOKafkaConsumer
from app.domain.order.event import OrderConfirmed
from app.application.order.handler import OrderConfirmedHandler

async def run_consumer(settings: KafkaSettings) -> None:
    consumer = AIOKafkaConsumer(
        "orders.confirmed",
        bootstrap_servers=settings.brokers,
        group_id="billing-order-confirmed",
        enable_auto_commit=False,
        auto_offset_reset="earliest",
        value_deserializer=lambda b: json.loads(b),
    )
    await consumer.start()
    asyncio.create_task(lag_scraper(consumer, "billing-order-confirmed"))

    handler = OrderConfirmedHandler()
    async for msg in consumer:
        start = time.perf_counter()
        event = OrderConfirmed(**msg.value)
        await handler.handle(event)
        await consumer.commit()

        elapsed = time.perf_counter() - start
        listener_processing_seconds.labels(topic="orders.confirmed").observe(elapsed)
        consumer_records_consumed.labels(topic="orders.confirmed").inc()

Traceparent в Kafka headers

R-KFK-OBS-3 — distributed trace продолжается через Kafka.

OTel instrumentation (рекомендованный путь)

opentelemetry-instrumentation-aiokafka автоматически инжектирует traceparent в headers при send_and_wait и извлекает при получении сообщения:

pip install opentelemetry-instrumentation-aiokafka opentelemetry-sdk opentelemetry-exporter-otlp
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor

AIOKafkaInstrumentor().instrument()

После этого ничего custom-кода не нужно — любой AIOKafkaProducer.send_and_wait(...) автоматически кладёт traceparent в headers, AIOKafkaConsumer — извлекает и продолжает span.

Ручной propagation (fallback)

Если instrumentation недоступна или нужен контроль:

from opentelemetry import trace, propagate
from opentelemetry.propagators.textmap import DefaultSetter

class KafkaHeaderSetter(DefaultSetter):
    def set(self, carrier: dict, key: str, value: str) -> None:
        carrier[key] = value.encode()

def inject_trace_headers(headers: dict) -> dict:
    propagate.inject(headers, setter=KafkaHeaderSetter())
    return headers

# producer side — outbox-relay
async def publish_event(
    producer: AIOKafkaProducer,
    event: OutboxEvent,
) -> None:
    headers = inject_trace_headers({})
    await producer.send_and_wait(
        topic=event.topic,
        key=str(event.aggregate_id).encode(),
        value=json.dumps(event.payload).encode(),
        headers=list(headers.items()),
    )
from opentelemetry.propagators.textmap import DefaultGetter

class KafkaHeaderGetter(DefaultGetter):
    def get(self, carrier, key):
        value = carrier.get(key)
        if value is None:
            return []
        return [value.decode() if isinstance(value, bytes) else value]

    def keys(self, carrier):
        return list(carrier.keys())

# consumer side
async def process_message(msg) -> None:
    headers_dict = dict(msg.headers)
    ctx = propagate.extract(headers_dict, getter=KafkaHeaderGetter())
    tracer = trace.get_tracer(__name__)
    with tracer.start_as_current_span("kafka.consume", context=ctx) as span:
        span.set_attribute("messaging.kafka.topic", msg.topic)
        span.set_attribute("messaging.kafka.partition", msg.partition)
        event = OrderConfirmed(**json.loads(msg.value))
        await handle_order_confirmed(event)

В Tempo/Jaeger trace выглядит как цепочка:

HTTP POST /orders/{id}/confirm
  └─ handler: OrderConfirmUseCase
       └─ outbox INSERT (PostgreSQL)
            └─ outbox-relay: kafka.produce → orders.confirmed
                 └─ billing-service: kafka.consume
                      └─ billing handler: ChargeCustomer
                           └─ payment-gateway HTTP call

DLQ size alert

R-KFK-OBS-4 — алерт на размер DLQ.

DLQ-топики (orders.confirmed.dlq, payments.processed.dlq) нуждаются в отдельном мониторинге. Через Kafka JMX или kminion:

- alert: KafkaDLQNotEmpty
  expr: |
    kafka_topic_partitions_messages_in_total{topic=~".*\\.dlq"} > 0
  for: 1m
  labels:
    severity: warning
  annotations:
    summary: "Сообщения в DLQ {{ $labels.topic }}"
    runbook: https://runbooks.internal/kafka-dlq

Альтернатива — consumer-based мониторинг: отдельный consumer читает DLQ и инкрементирует счётчик:

dlq_messages_total = Counter(
    "kafka_dlq_messages_total",
    "Messages landed in DLQ",
    ["topic"],
)

async def monitor_dlq(settings: KafkaSettings) -> None:
    consumer = AIOKafkaConsumer(
        "orders.confirmed.dlq",
        bootstrap_servers=settings.brokers,
        group_id="dlq-monitor",
        enable_auto_commit=False,
        auto_offset_reset="earliest",
    )
    await consumer.start()
    async for msg in consumer:
        dlq_messages_total.labels(topic=msg.topic).inc()
        await consumer.commit()

Что запрещено

АнтипаттернПравилоЧто взамен
Нет alert на kafka_consumer_lagR-KFK-OBS-X1alert per критичность с runbook
Lag alert без for: 5mR-KFK-OBS-2for: 5m — избегаем flap на коротком spike
Один universal threshold для всех topic'овR-KFK-OBS-2money ≤ 100, orders ≤ 1000, analytics ≤ 100000
AIOKafkaInstrumentor не вызванR-KFK-OBS-3AIOKafkaInstrumentor().instrument() в app startup
Нет DLQ size alertR-KFK-OBS-4alert per DLQ-топик
Producer errors не алертятсяR-KFK-OBS-1kafka_producer_record_errors_total > 0
Метрики без service/env labelsR-KFK-OBS-1добавить через prometheus_client default labels
Lag scraper не запущен рядом с consumerR-KFK-OBS-1asyncio.create_task(lag_scraper(...)) в startup

Куда дальше

  • Конфигурация — KafkaSettings через pydantic-settings, bootstrap_servers из env.
  • Consumer — manual commit, max_poll_interval_ms, rebalance.
  • Retry topic + DLQ — retry-топики, DLQ-routing, max-attempts.
  • Producer — идемпотентный producer, partition key.
  • Outbox publishing — outbox-relay, FOR UPDATE SKIP LOCKED.
  • Idempotent consumer — processed_event, dedup под race.
  • Event design — @dataclass(frozen=True), forward-compat schema.
  • Security — TLS, SASL_SSL, per-service ACL.