Опирается на правила: R-KFK-OBS-1R-KFK-OBS-4 и R-KFK-OBS-X1 из Kafka Style Guide → раздел 8. Observability.

Важно знать

  • kafkajs не интегрируется с Micrometer — метрики собираются вручную через prom-client, подписываясь на instrumentation-события (consumer.events, producer.events).
  • Consumer lag — главный health-сигнал. Растущий lag = consumer не справляется или упал.
  • Alert на lag > N для критичных топиков в течение 5 минут — инцидент.
  • @opentelemetry/instrumentation-kafkajs автоматически кладёт traceparent в headers producer'а и продолжает trace на стороне consumer — custom-кода не требуется.
  • DLQ-size alert обязателен — каждое сообщение там требует разбора.
  • Без consumer-lag alerts пропадание сообщений замечается через жалобы пользователей.
  • Consumer lag отдельно экспортирует fetchOffsets vs latest через kafkajs Admin или из кластерного экспортёра (Kafka Exporter / Confluent).

Kafka pipeline без observability — чёрный ящик: producer публикует, consumer обрабатывает, но что между ними и как быстро — никто не видит. Consumer lag и DLQ-size — два маркера, по которым ops понимает «жив ли pipeline».

Метрики через prom-client

R-KFK-OBS-1: kafkajs публикует instrumentation-события на lifecycle своих клиентов — END_BATCH_PROCESS, REQUEST, COMMIT_OFFSETS и др. На каждое событие регистрируется обработчик, который обновляет prom-client Counter / Histogram.

Создаём отдельный KafkaMetricsService:

import { Injectable, OnModuleInit } from '@nestjs/common';
import { Consumer, Kafka, Producer } from 'kafkajs';
import { Counter, Histogram, register } from 'prom-client';

@Injectable()
export class KafkaMetricsService implements OnModuleInit {
  private readonly consumerMessagesTotal = new Counter({
    name: 'kafka_consumer_messages_total',
    help: 'Total messages processed by consumer',
    labelNames: ['topic', 'group_id', 'status'],
  });

  private readonly consumerBatchDuration = new Histogram({
    name: 'kafka_consumer_batch_duration_seconds',
    help: 'Consumer batch processing duration',
    labelNames: ['topic', 'group_id'],
    buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10],
  });

  private readonly producerRequestsTotal = new Counter({
    name: 'kafka_producer_requests_total',
    help: 'Total producer requests',
    labelNames: ['topic', 'status'],
  });

  bindConsumer(consumer: Consumer, groupId: string): void {
    const { BATCH } = consumer.events;

    consumer.on(BATCH, ({ payload }) => {
      const { topic, duration, batch } = payload;
      this.consumerBatchDuration.labels(topic, groupId).observe(duration / 1000);
      this.consumerMessagesTotal
        .labels(topic, groupId, 'success')
        .inc(batch.messages.length);
    });
  }

  bindProducer(producer: Producer): void {
    const { REQUEST } = producer.events;

    producer.on(REQUEST, ({ payload }) => {
      const topic = String(payload.apiName ?? 'unknown');
      this.producerRequestsTotal.labels(topic, 'sent').inc();
    });
  }

  onModuleInit(): void {
    // prom-client регистрирует метрики глобально — /metrics подхватывает автоматически
  }
}

Вызов bindConsumer / bindProducer — при инициализации каждого клиента, до connect():

@Injectable()
export class OrderConsumerService implements OnApplicationBootstrap {
  private readonly consumer: Consumer;

  constructor(
    private readonly kafka: KafkaClientFactory,
    private readonly metrics: KafkaMetricsService,
  ) {
    this.consumer = this.kafka.createConsumer({ groupId: 'order-service-confirmed' });
    this.metrics.bindConsumer(this.consumer, 'order-service-confirmed');
  }

  async onApplicationBootstrap(): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
    await this.consumer.run({ autoCommit: false, eachMessage: this.handle.bind(this) });
  }

  private async handle({ topic, partition, message, heartbeat }): Promise<void> {
    const event = orderConfirmedSchema.parse(JSON.parse(message.value!.toString()));
    await this.orderHandler.handle(event);
    await this.consumer.commitOffsets([
      { topic, partition, offset: String(Number(message.offset) + 1) },
    ]);
  }
}

Ключевые метрики, которые собирает bind:

МетрикаЧто показывает
kafka_consumer_messages_total{topic,group_id,status}Сколько сообщений обработано
kafka_consumer_batch_duration_seconds{topic,group_id}Время обработки батча
kafka_producer_requests_total{topic,status}Запросы producer'а

Consumer lag — главный сигнал

R-KFK-OBS-2: lag показывает разрыв между последним committed offset consumer'а и latest offset в partition.

Kafkajs не экспортирует lag напрямую — два варианта:

Вариант 1 — кластерный экспортёр (рекомендуется для прода): Kafka Exporter или Confluent's kafka-lag-exporter публикует kafka_consumergroup_lag{consumergroup, topic, partition}. Не требует изменений в сервисе.

Вариант 2 — kafkajs Admin.fetchOffsets (для небольших сетапов):

@Injectable()
export class KafkaLagExporter implements OnApplicationBootstrap {
  private readonly consumerLag = new Gauge({
    name: 'kafka_consumer_lag',
    help: 'Consumer lag per topic partition',
    labelNames: ['topic', 'partition', 'group_id'],
  });

  constructor(private readonly kafka: Kafka) {}

  async onApplicationBootstrap(): Promise<void> {
    const admin = this.kafka.admin();
    await admin.connect();

    setInterval(async () => {
      const groups = ['order-service-confirmed', 'billing-order-confirmed'];
      for (const groupId of groups) {
        const offsets = await admin.fetchOffsets({ groupId, topics: ['orders.confirmed'] });
        const topicOffsets = await admin.fetchTopicOffsets('orders.confirmed');

        for (const partitionInfo of offsets[0].partitions) {
          const latest = topicOffsets.find((o) => o.partition === partitionInfo.partition);
          const lag = Number(latest?.high ?? 0) - Number(partitionInfo.offset);
          this.consumerLag.labels('orders.confirmed', String(partitionInfo.partition), groupId).set(lag);
        }
      }
    }, 15_000);
  }
}

Alert (Prometheus/Alertmanager):

- alert: KafkaConsumerLagHigh
  expr: |
    max by (topic, group_id) (kafka_consumer_lag) > 1000
  for: 5m
  annotations:
    summary: "Consumer {{ $labels.group_id }} лагает на topic {{ $labels.topic }}"
    runbook: https://runbooks.internal/kafka-consumer-lag

Threshold по критичности:

Тип событийThresholdПриоритет
Money events (payments.*)> 100Немедленно, SMS
Order events (orders.*)> 1000Ticket
Analytics events> 100 000Low priority

Растущий lag — три причины:

  • Consumer упал (CrashLoop, OOM): смотреть pod logs немедленно.
  • Consumer не справляется: concurrency < числа партиций; eachMessage слишком долгий; внешняя зависимость тормозит.
  • Rebalance в петле: нет вызова heartbeat() в долгой обработке — kafkajs считает consumer мёртвым.

Lag только на одной partition часто означает hot key — все события одного customerId или orderId сосредоточены там.

Tracing через traceparent

R-KFK-OBS-3: @opentelemetry/instrumentation-kafkajs автоматически инструментирует kafkajs — custom-кода не требуется.

Установка:

// tracing-init.ts (загружается до всего остального, NODE_OPTIONS=--require ./tracing-init.js)
import { NodeSDK } from '@opentelemetry/sdk-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc';
import { KafkaJsInstrumentation } from '@opentelemetry/instrumentation-kafkajs';

const sdk = new NodeSDK({
  traceExporter: new OTLPTraceExporter({ url: process.env.OTEL_EXPORTER_OTLP_ENDPOINT }),
  instrumentations: [new KafkaJsInstrumentation()],
});

sdk.start();

После этого:

  • Producer: при каждом producer.send(...) инструментация вставляет traceparent и tracestate в message.headers.
  • Consumer: при получении сообщения с traceparent в headers instrumentation продолжает span — eachMessage становится дочерним span'ом исходного trace.

Итоговый путь в Tempo/Jaeger:

HTTP POST /orders/{id}/confirm
  → OrderCommandHandler.handle [span]
    → outbox INSERT [db span]
  → OutboxRelay.publishBatch [span]
    → producer.send orders.confirmed [kafka span] ← traceparent в headers
      → BillingConsumer.eachMessage orders.confirmed [kafka span]
        → BillingCommandHandler.chargeOrder [span]
          → UPDATE billing.charges [db span]

Весь межсервисный путь виден как единый trace без дополнительного кода в бизнес-логике.

DLQ size alert

R-KFK-OBS-4: каждый DLQ topic требует отдельного alert. Подробности — Retry topic + DLQ.

Для lag-экспортёра (вариант 2 выше): group_id для DLQ-consumer регистрируется отдельно, threshold = 0 (любое сообщение в DLQ — инцидент для money-топиков).

Что запрещено

АнтипаттернПравилоЧто взамен
Нет alert на consumer lagR-KFK-OBS-X1alert per критичность с runbook
Lag alert без for: 5mR-KFK-OBS-2for: 5m (избегаем flap на коротких rebalance)
Один общий lag threshold для всех топиковR-KFK-OBS-2money 100, orders 1000, analytics 100k
KafkaJsInstrumentation не подключёнR-KFK-OBS-3@opentelemetry/instrumentation-kafkajs в NodeSDK
traceparent header добавляется вручнуюR-KFK-OBS-3автоинструментация через OTel
Нет DLQ size alertR-KFK-OBS-4alert per DLQ topic
Producer request errors не алертятсяR-KFK-OBS-1Counter на kafka_producer_requests_total{status="error"}
bindConsumer/bindProducer после connect()R-KFK-OBS-1регистрировать listeners до вызова connect()

Куда дальше

  • Конфигурация — KafkaConfig, fail-fast на старте, brokers из env.
  • Consumer — autoCommit: false, heartbeat(), partitionsConsumedConcurrently.
  • Retry topic + DLQ — retry-топики, DLQ-monitoring детально.
  • Producer — идемпотентный producer, acks: -1, partition key.
  • Outbox publishing — outbox-relay, @Interval, FOR UPDATE SKIP LOCKED.
  • Event design — eventId, traceparent в payload, zod-схемы.
  • Idempotent consumer — processed_event, dedup в одной транзакции.
  • Security — TLS, SASL/SCRAM, per-service ACL.