---
title: "Observability Kafka (Node) — prom-client, consumer lag, traceparent"
nav_title: "Observability"
excerpt: "Метрики Kafka в NestJS через prom-client: instrumentation-события kafkajs, alert на consumer lag, traceparent через OTel, DLQ-size alert."
keywords: "Kafka observability NestJS, prom-client Kafka metrics, consumer lag alert Node, traceparent kafkajs, opentelemetry-instrumentation-kafkajs, R-KFK-OBS, kafkajs instrumentation events, DLQ monitoring Node"
focus_keyword: "Kafka observability NestJS"
tags: ["kafka", "node", "nestjs", "observability", "prom-client", "opentelemetry"]
---

# Observability Kafka (Node) — prom-client, consumer lag, traceparent

> **Опирается на правила:** `R-KFK-OBS-1` … `R-KFK-OBS-4` и `R-KFK-OBS-X1` из Kafka Style Guide → [раздел 8. Observability](/standards/backend/kafka/#8-observability).

> **Важно знать**
> - kafkajs **не интегрируется с Micrometer** — метрики собираются вручную через `prom-client`, подписываясь на instrumentation-события (`consumer.events`, `producer.events`).
> - **Consumer lag** — главный health-сигнал. Растущий lag = consumer не справляется или упал.
> - **Alert на lag > N** для критичных топиков в течение 5 минут — инцидент.
> - **`@opentelemetry/instrumentation-kafkajs`** автоматически кладёт `traceparent` в headers producer'а и продолжает trace на стороне consumer — custom-кода не требуется.
> - **DLQ-size alert** обязателен — каждое сообщение там требует разбора.
> - **Без consumer-lag alerts** пропадание сообщений замечается через жалобы пользователей.
> - Consumer lag отдельно экспортирует `fetchOffsets` vs latest через kafkajs `Admin` или из кластерного экспортёра (Kafka Exporter / Confluent).

Kafka pipeline без observability — чёрный ящик: producer публикует, consumer обрабатывает, но что между ними и как быстро — никто не видит. Consumer lag и DLQ-size — два маркера, по которым ops понимает «жив ли pipeline».

## Метрики через prom-client

`R-KFK-OBS-1`: kafkajs публикует instrumentation-события на lifecycle своих клиентов — `END_BATCH_PROCESS`, `REQUEST`, `COMMIT_OFFSETS` и др. На каждое событие регистрируется обработчик, который обновляет `prom-client` Counter / Histogram.

Создаём отдельный `KafkaMetricsService`:

```ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Consumer, Kafka, Producer } from 'kafkajs';
import { Counter, Histogram, register } from 'prom-client';

@Injectable()
export class KafkaMetricsService implements OnModuleInit {
  private readonly consumerMessagesTotal = new Counter({
    name: 'kafka_consumer_messages_total',
    help: 'Total messages processed by consumer',
    labelNames: ['topic', 'group_id', 'status'],
  });

  private readonly consumerBatchDuration = new Histogram({
    name: 'kafka_consumer_batch_duration_seconds',
    help: 'Consumer batch processing duration',
    labelNames: ['topic', 'group_id'],
    buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10],
  });

  private readonly producerRequestsTotal = new Counter({
    name: 'kafka_producer_requests_total',
    help: 'Total producer requests',
    labelNames: ['topic', 'status'],
  });

  bindConsumer(consumer: Consumer, groupId: string): void {
    const { BATCH } = consumer.events;

    consumer.on(BATCH, ({ payload }) => {
      const { topic, duration, batch } = payload;
      this.consumerBatchDuration.labels(topic, groupId).observe(duration / 1000);
      this.consumerMessagesTotal
        .labels(topic, groupId, 'success')
        .inc(batch.messages.length);
    });
  }

  bindProducer(producer: Producer): void {
    const { REQUEST } = producer.events;

    producer.on(REQUEST, ({ payload }) => {
      const topic = String(payload.apiName ?? 'unknown');
      this.producerRequestsTotal.labels(topic, 'sent').inc();
    });
  }

  onModuleInit(): void {
    // prom-client регистрирует метрики глобально — /metrics подхватывает автоматически
  }
}
```

Вызов `bindConsumer` / `bindProducer` — при инициализации каждого клиента, до `connect()`:

```ts
@Injectable()
export class OrderConsumerService implements OnApplicationBootstrap {
  private readonly consumer: Consumer;

  constructor(
    private readonly kafka: KafkaClientFactory,
    private readonly metrics: KafkaMetricsService,
  ) {
    this.consumer = this.kafka.createConsumer({ groupId: 'order-service-confirmed' });
    this.metrics.bindConsumer(this.consumer, 'order-service-confirmed');
  }

  async onApplicationBootstrap(): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
    await this.consumer.run({ autoCommit: false, eachMessage: this.handle.bind(this) });
  }

  private async handle({ topic, partition, message, heartbeat }): Promise<void> {
    const event = orderConfirmedSchema.parse(JSON.parse(message.value!.toString()));
    await this.orderHandler.handle(event);
    await this.consumer.commitOffsets([
      { topic, partition, offset: String(Number(message.offset) + 1) },
    ]);
  }
}
```

Ключевые метрики, которые собирает bind:

| Метрика | Что показывает |
|---|---|
| `kafka_consumer_messages_total{topic,group_id,status}` | Сколько сообщений обработано |
| `kafka_consumer_batch_duration_seconds{topic,group_id}` | Время обработки батча |
| `kafka_producer_requests_total{topic,status}` | Запросы producer'а |

## Consumer lag — главный сигнал

`R-KFK-OBS-2`: lag показывает разрыв между последним committed offset consumer'а и latest offset в partition.

Kafkajs не экспортирует lag напрямую — два варианта:

**Вариант 1 — кластерный экспортёр** (рекомендуется для прода): [Kafka Exporter](https://github.com/danielqsj/kafka_exporter) или Confluent's `kafka-lag-exporter` публикует `kafka_consumergroup_lag{consumergroup, topic, partition}`. Не требует изменений в сервисе.

**Вариант 2 — kafkajs `Admin.fetchOffsets`** (для небольших сетапов):

```ts
@Injectable()
export class KafkaLagExporter implements OnApplicationBootstrap {
  private readonly consumerLag = new Gauge({
    name: 'kafka_consumer_lag',
    help: 'Consumer lag per topic partition',
    labelNames: ['topic', 'partition', 'group_id'],
  });

  constructor(private readonly kafka: Kafka) {}

  async onApplicationBootstrap(): Promise<void> {
    const admin = this.kafka.admin();
    await admin.connect();

    setInterval(async () => {
      const groups = ['order-service-confirmed', 'billing-order-confirmed'];
      for (const groupId of groups) {
        const offsets = await admin.fetchOffsets({ groupId, topics: ['orders.confirmed'] });
        const topicOffsets = await admin.fetchTopicOffsets('orders.confirmed');

        for (const partitionInfo of offsets[0].partitions) {
          const latest = topicOffsets.find((o) => o.partition === partitionInfo.partition);
          const lag = Number(latest?.high ?? 0) - Number(partitionInfo.offset);
          this.consumerLag.labels('orders.confirmed', String(partitionInfo.partition), groupId).set(lag);
        }
      }
    }, 15_000);
  }
}
```

Alert (Prometheus/Alertmanager):

```yaml
- alert: KafkaConsumerLagHigh
  expr: |
    max by (topic, group_id) (kafka_consumer_lag) > 1000
  for: 5m
  annotations:
    summary: "Consumer {{ $labels.group_id }} лагает на topic {{ $labels.topic }}"
    runbook: https://runbooks.internal/kafka-consumer-lag
```

Threshold по критичности:

| Тип событий | Threshold | Приоритет |
|---|---|---|
| Money events (`payments.*`) | > 100 | Немедленно, SMS |
| Order events (`orders.*`) | > 1000 | Ticket |
| Analytics events | > 100 000 | Low priority |

Растущий lag — три причины:

- **Consumer упал** (CrashLoop, OOM): смотреть pod logs немедленно.
- **Consumer не справляется**: concurrency < числа партиций; `eachMessage` слишком долгий; внешняя зависимость тормозит.
- **Rebalance в петле**: нет вызова `heartbeat()` в долгой обработке — kafkajs считает consumer мёртвым.

Lag только на одной partition часто означает hot key — все события одного `customerId` или `orderId` сосредоточены там.

## Tracing через traceparent

`R-KFK-OBS-3`: `@opentelemetry/instrumentation-kafkajs` автоматически инструментирует kafkajs — custom-кода не требуется.

Установка:

```ts
// tracing-init.ts (загружается до всего остального, NODE_OPTIONS=--require ./tracing-init.js)
import { NodeSDK } from '@opentelemetry/sdk-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc';
import { KafkaJsInstrumentation } from '@opentelemetry/instrumentation-kafkajs';

const sdk = new NodeSDK({
  traceExporter: new OTLPTraceExporter({ url: process.env.OTEL_EXPORTER_OTLP_ENDPOINT }),
  instrumentations: [new KafkaJsInstrumentation()],
});

sdk.start();
```

После этого:

- **Producer**: при каждом `producer.send(...)` инструментация вставляет `traceparent` и `tracestate` в `message.headers`.
- **Consumer**: при получении сообщения с `traceparent` в headers instrumentation продолжает span — `eachMessage` становится дочерним span'ом исходного trace.

Итоговый путь в Tempo/Jaeger:

```
HTTP POST /orders/{id}/confirm
  → OrderCommandHandler.handle [span]
    → outbox INSERT [db span]
  → OutboxRelay.publishBatch [span]
    → producer.send orders.confirmed [kafka span] ← traceparent в headers
      → BillingConsumer.eachMessage orders.confirmed [kafka span]
        → BillingCommandHandler.chargeOrder [span]
          → UPDATE billing.charges [db span]
```

Весь межсервисный путь виден как единый trace без дополнительного кода в бизнес-логике.

## DLQ size alert

`R-KFK-OBS-4`: каждый DLQ topic требует отдельного alert. Подробности — [Retry topic + DLQ](node/retry-and-dlq.md).

Для lag-экспортёра (вариант 2 выше): `group_id` для DLQ-consumer регистрируется отдельно, threshold = 0 (любое сообщение в DLQ — инцидент для money-топиков).

## Что запрещено

| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Нет alert на consumer lag | `R-KFK-OBS-X1` | alert per критичность с runbook |
| Lag alert без `for: 5m` | `R-KFK-OBS-2` | `for: 5m` (избегаем flap на коротких rebalance) |
| Один общий lag threshold для всех топиков | `R-KFK-OBS-2` | money 100, orders 1000, analytics 100k |
| `KafkaJsInstrumentation` не подключён | `R-KFK-OBS-3` | `@opentelemetry/instrumentation-kafkajs` в NodeSDK |
| traceparent header добавляется вручную | `R-KFK-OBS-3` | автоинструментация через OTel |
| Нет DLQ size alert | `R-KFK-OBS-4` | alert per DLQ topic |
| Producer request errors не алертятся | `R-KFK-OBS-1` | Counter на `kafka_producer_requests_total{status="error"}` |
| `bindConsumer`/`bindProducer` после `connect()` | `R-KFK-OBS-1` | регистрировать listeners до вызова `connect()` |

## Куда дальше

- [Конфигурация](node/configuration.md) — `KafkaConfig`, fail-fast на старте, brokers из env.
- [Consumer](node/consumer.md) — `autoCommit: false`, `heartbeat()`, `partitionsConsumedConcurrently`.
- [Retry topic + DLQ](node/retry-and-dlq.md) — retry-топики, DLQ-monitoring детально.
- [Producer](node/producer.md) — идемпотентный producer, `acks: -1`, partition key.
- [Outbox publishing](node/outbox-publishing.md) — outbox-relay, `@Interval`, `FOR UPDATE SKIP LOCKED`.
- [Event design](node/event-design.md) — `eventId`, `traceparent` в payload, zod-схемы.
- [Idempotent consumer](node/idempotent-consumer.md) — `processed_event`, dedup в одной транзакции.
- [Security](node/security.md) — TLS, SASL/SCRAM, per-service ACL.
