Опирается на правила:
R-KFK-OBS-1…R-KFK-OBS-4иR-KFK-OBS-X1из Kafka Style Guide → раздел 8. Observability.
Важно знать
- kafkajs не интегрируется с Micrometer — метрики собираются вручную через
prom-client, подписываясь на instrumentation-события (consumer.events,producer.events).- Consumer lag — главный health-сигнал. Растущий lag = consumer не справляется или упал.
- Alert на lag > N для критичных топиков в течение 5 минут — инцидент.
@opentelemetry/instrumentation-kafkajsавтоматически кладётtraceparentв headers producer'а и продолжает trace на стороне consumer — custom-кода не требуется.- DLQ-size alert обязателен — каждое сообщение там требует разбора.
- Без consumer-lag alerts пропадание сообщений замечается через жалобы пользователей.
- Consumer lag отдельно экспортирует
fetchOffsetsvs latest через kafkajsAdminили из кластерного экспортёра (Kafka Exporter / Confluent).
Kafka pipeline без observability — чёрный ящик: producer публикует, consumer обрабатывает, но что между ними и как быстро — никто не видит. Consumer lag и DLQ-size — два маркера, по которым ops понимает «жив ли pipeline».
Метрики через prom-client
R-KFK-OBS-1: kafkajs публикует instrumentation-события на lifecycle своих клиентов — END_BATCH_PROCESS, REQUEST, COMMIT_OFFSETS и др. На каждое событие регистрируется обработчик, который обновляет prom-client Counter / Histogram.
Создаём отдельный KafkaMetricsService:
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Consumer, Kafka, Producer } from 'kafkajs';
import { Counter, Histogram, register } from 'prom-client';
@Injectable()
export class KafkaMetricsService implements OnModuleInit {
private readonly consumerMessagesTotal = new Counter({
name: 'kafka_consumer_messages_total',
help: 'Total messages processed by consumer',
labelNames: ['topic', 'group_id', 'status'],
});
private readonly consumerBatchDuration = new Histogram({
name: 'kafka_consumer_batch_duration_seconds',
help: 'Consumer batch processing duration',
labelNames: ['topic', 'group_id'],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10],
});
private readonly producerRequestsTotal = new Counter({
name: 'kafka_producer_requests_total',
help: 'Total producer requests',
labelNames: ['topic', 'status'],
});
bindConsumer(consumer: Consumer, groupId: string): void {
const { BATCH } = consumer.events;
consumer.on(BATCH, ({ payload }) => {
const { topic, duration, batch } = payload;
this.consumerBatchDuration.labels(topic, groupId).observe(duration / 1000);
this.consumerMessagesTotal
.labels(topic, groupId, 'success')
.inc(batch.messages.length);
});
}
bindProducer(producer: Producer): void {
const { REQUEST } = producer.events;
producer.on(REQUEST, ({ payload }) => {
const topic = String(payload.apiName ?? 'unknown');
this.producerRequestsTotal.labels(topic, 'sent').inc();
});
}
onModuleInit(): void {
// prom-client регистрирует метрики глобально — /metrics подхватывает автоматически
}
}
Вызов bindConsumer / bindProducer — при инициализации каждого клиента, до connect():
@Injectable()
export class OrderConsumerService implements OnApplicationBootstrap {
private readonly consumer: Consumer;
constructor(
private readonly kafka: KafkaClientFactory,
private readonly metrics: KafkaMetricsService,
) {
this.consumer = this.kafka.createConsumer({ groupId: 'order-service-confirmed' });
this.metrics.bindConsumer(this.consumer, 'order-service-confirmed');
}
async onApplicationBootstrap(): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
await this.consumer.run({ autoCommit: false, eachMessage: this.handle.bind(this) });
}
private async handle({ topic, partition, message, heartbeat }): Promise<void> {
const event = orderConfirmedSchema.parse(JSON.parse(message.value!.toString()));
await this.orderHandler.handle(event);
await this.consumer.commitOffsets([
{ topic, partition, offset: String(Number(message.offset) + 1) },
]);
}
}
Ключевые метрики, которые собирает bind:
| Метрика | Что показывает |
|---|---|
kafka_consumer_messages_total{topic,group_id,status} | Сколько сообщений обработано |
kafka_consumer_batch_duration_seconds{topic,group_id} | Время обработки батча |
kafka_producer_requests_total{topic,status} | Запросы producer'а |
Consumer lag — главный сигнал
R-KFK-OBS-2: lag показывает разрыв между последним committed offset consumer'а и latest offset в partition.
Kafkajs не экспортирует lag напрямую — два варианта:
Вариант 1 — кластерный экспортёр (рекомендуется для прода): Kafka Exporter или Confluent's kafka-lag-exporter публикует kafka_consumergroup_lag{consumergroup, topic, partition}. Не требует изменений в сервисе.
Вариант 2 — kafkajs Admin.fetchOffsets (для небольших сетапов):
@Injectable()
export class KafkaLagExporter implements OnApplicationBootstrap {
private readonly consumerLag = new Gauge({
name: 'kafka_consumer_lag',
help: 'Consumer lag per topic partition',
labelNames: ['topic', 'partition', 'group_id'],
});
constructor(private readonly kafka: Kafka) {}
async onApplicationBootstrap(): Promise<void> {
const admin = this.kafka.admin();
await admin.connect();
setInterval(async () => {
const groups = ['order-service-confirmed', 'billing-order-confirmed'];
for (const groupId of groups) {
const offsets = await admin.fetchOffsets({ groupId, topics: ['orders.confirmed'] });
const topicOffsets = await admin.fetchTopicOffsets('orders.confirmed');
for (const partitionInfo of offsets[0].partitions) {
const latest = topicOffsets.find((o) => o.partition === partitionInfo.partition);
const lag = Number(latest?.high ?? 0) - Number(partitionInfo.offset);
this.consumerLag.labels('orders.confirmed', String(partitionInfo.partition), groupId).set(lag);
}
}
}, 15_000);
}
}
Alert (Prometheus/Alertmanager):
- alert: KafkaConsumerLagHigh
expr: |
max by (topic, group_id) (kafka_consumer_lag) > 1000
for: 5m
annotations:
summary: "Consumer {{ $labels.group_id }} лагает на topic {{ $labels.topic }}"
runbook: https://runbooks.internal/kafka-consumer-lag
Threshold по критичности:
| Тип событий | Threshold | Приоритет |
|---|---|---|
Money events (payments.*) | > 100 | Немедленно, SMS |
Order events (orders.*) | > 1000 | Ticket |
| Analytics events | > 100 000 | Low priority |
Растущий lag — три причины:
- Consumer упал (CrashLoop, OOM): смотреть pod logs немедленно.
- Consumer не справляется: concurrency < числа партиций;
eachMessageслишком долгий; внешняя зависимость тормозит. - Rebalance в петле: нет вызова
heartbeat()в долгой обработке — kafkajs считает consumer мёртвым.
Lag только на одной partition часто означает hot key — все события одного customerId или orderId сосредоточены там.
Tracing через traceparent
R-KFK-OBS-3: @opentelemetry/instrumentation-kafkajs автоматически инструментирует kafkajs — custom-кода не требуется.
Установка:
// tracing-init.ts (загружается до всего остального, NODE_OPTIONS=--require ./tracing-init.js)
import { NodeSDK } from '@opentelemetry/sdk-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc';
import { KafkaJsInstrumentation } from '@opentelemetry/instrumentation-kafkajs';
const sdk = new NodeSDK({
traceExporter: new OTLPTraceExporter({ url: process.env.OTEL_EXPORTER_OTLP_ENDPOINT }),
instrumentations: [new KafkaJsInstrumentation()],
});
sdk.start();
После этого:
- Producer: при каждом
producer.send(...)инструментация вставляетtraceparentиtracestateвmessage.headers. - Consumer: при получении сообщения с
traceparentв headers instrumentation продолжает span —eachMessageстановится дочерним span'ом исходного trace.
Итоговый путь в Tempo/Jaeger:
HTTP POST /orders/{id}/confirm
→ OrderCommandHandler.handle [span]
→ outbox INSERT [db span]
→ OutboxRelay.publishBatch [span]
→ producer.send orders.confirmed [kafka span] ← traceparent в headers
→ BillingConsumer.eachMessage orders.confirmed [kafka span]
→ BillingCommandHandler.chargeOrder [span]
→ UPDATE billing.charges [db span]
Весь межсервисный путь виден как единый trace без дополнительного кода в бизнес-логике.
DLQ size alert
R-KFK-OBS-4: каждый DLQ topic требует отдельного alert. Подробности — Retry topic + DLQ.
Для lag-экспортёра (вариант 2 выше): group_id для DLQ-consumer регистрируется отдельно, threshold = 0 (любое сообщение в DLQ — инцидент для money-топиков).
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Нет alert на consumer lag | R-KFK-OBS-X1 | alert per критичность с runbook |
Lag alert без for: 5m | R-KFK-OBS-2 | for: 5m (избегаем flap на коротких rebalance) |
| Один общий lag threshold для всех топиков | R-KFK-OBS-2 | money 100, orders 1000, analytics 100k |
KafkaJsInstrumentation не подключён | R-KFK-OBS-3 | @opentelemetry/instrumentation-kafkajs в NodeSDK |
| traceparent header добавляется вручную | R-KFK-OBS-3 | автоинструментация через OTel |
| Нет DLQ size alert | R-KFK-OBS-4 | alert per DLQ topic |
| Producer request errors не алертятся | R-KFK-OBS-1 | Counter на kafka_producer_requests_total{status="error"} |
bindConsumer/bindProducer после connect() | R-KFK-OBS-1 | регистрировать listeners до вызова connect() |
Куда дальше
- Конфигурация —
KafkaConfig, fail-fast на старте, brokers из env. - Consumer —
autoCommit: false,heartbeat(),partitionsConsumedConcurrently. - Retry topic + DLQ — retry-топики, DLQ-monitoring детально.
- Producer — идемпотентный producer,
acks: -1, partition key. - Outbox publishing — outbox-relay,
@Interval,FOR UPDATE SKIP LOCKED. - Event design —
eventId,traceparentв payload, zod-схемы. - Idempotent consumer —
processed_event, dedup в одной транзакции. - Security — TLS, SASL/SCRAM, per-service ACL.