Опирается на правила:
R-KFK-OBS-1…R-KFK-OBS-4иR-KFK-OBS-X1из Kafka Style Guide → раздел 8. Observability.
Важно знать
segmentio/kafka-goне встраивается в Micrometer — метрики регистрируются вручную черезpromauto.kafka_consumer_lag— главный health-сигнал; растущий lag означает, что consumer не справляется или упал.- Alert на lag > N для критичных топиков в течение 5 минут — инцидент (
R-KFK-OBS-2).traceparentпередаётся в Kafka headers через OTel propagator; consumer извлекает и продолжает span (R-KFK-OBS-3).- DLQ-size alert обязателен — каждое сообщение в DLQ требует разбора (
R-KFK-OBS-4).- Lag экспонирует
kafka.Reader.Stats()— читается в отдельной горутине, не в poll-цикле.- Без consumer-lag alerts пропадание сообщений обнаруживается по жалобам пользователей (
R-KFK-OBS-X1).processing_errors_totalс меткойerror_kindпозволяет разделить transient и poison-pill.
Kafka-пайплайн без observability — чёрный ящик: producer публикует, consumer обрабатывает, но что между ними и насколько быстро — никто не видит. Consumer lag и DLQ-size — два маркера, по которым команда понимает «жив ли pipeline»; без них инцидент обнаруживается через жалобы.
Регистрация метрик (R-KFK-OBS-1)
segmentio/kafka-go не имеет встроенной Micrometer-интеграции. Метрики регистрируются через promauto при инициализации writer/reader:
// infra/kafka/metrics.go
package kafka
import "github.com/prometheus/client_golang/prometheus/promauto"
import "github.com/prometheus/client_golang/prometheus"
var (
messagesProduced = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kafka_messages_produced_total",
Help: "Total Kafka messages produced",
}, []string{"topic"})
messagesConsumed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kafka_messages_consumed_total",
Help: "Total Kafka messages consumed",
}, []string{"topic", "group"})
processingErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kafka_processing_errors_total",
Help: "Kafka consumer processing errors",
}, []string{"topic", "group", "error_kind"})
dlqMessages = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kafka_dlq_messages_total",
Help: "Messages routed to DLQ",
}, []string{"topic"})
consumerLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "kafka_consumer_lag",
Help: "Current consumer lag per topic/group",
}, []string{"topic", "group"})
processingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "kafka_processing_duration_seconds",
Help: "Message processing duration",
Buckets: prometheus.DefBuckets,
}, []string{"topic", "group"})
)
Инкремент в consumer-цикле
Счётчики обновляются непосредственно в Run-горутине:
// adapters/in/kafka/order_confirmed_consumer.go
func (c *OrderConfirmedConsumer) Run(ctx context.Context) error {
for {
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return fmt.Errorf("fetch: %w", err)
}
start := time.Now()
var evt OrderConfirmedEvent
if err := json.Unmarshal(msg.Value, &evt); err != nil {
processingErrors.WithLabelValues(
c.topic, c.groupID, "unmarshal",
).Inc()
dlqMessages.WithLabelValues(c.topic).Inc()
c.sendToDLQ(ctx, msg, err)
_ = c.reader.CommitMessages(ctx, msg)
continue
}
if err := c.handler.Handle(ctx, evt); err != nil {
kind := errorKind(err)
processingErrors.WithLabelValues(c.topic, c.groupID, kind).Inc()
if isTransient(err) {
c.sendToRetry(ctx, msg)
} else {
dlqMessages.WithLabelValues(c.topic).Inc()
c.sendToDLQ(ctx, msg, err)
}
} else {
messagesConsumed.WithLabelValues(c.topic, c.groupID).Inc()
}
processingDuration.WithLabelValues(c.topic, c.groupID).
Observe(time.Since(start).Seconds())
if err := c.reader.CommitMessages(ctx, msg); err != nil {
return fmt.Errorf("commit: %w", err)
}
}
}
func errorKind(err error) string {
var ge *GatewayError
if errors.As(err, &ge) {
return "gateway"
}
return "internal"
}
Инкремент на producer
// infra/kafka/outbox_relay.go — после WriteMessages
if err := r.writer.WriteMessages(ctx, msg); err != nil {
return fmt.Errorf("publish %s: %w", e.EventType, err)
}
messagesProduced.WithLabelValues(topicFor(e.EventType)).Inc()
Consumer lag (R-KFK-OBS-1, R-KFK-OBS-2)
kafka.Reader.Stats() возвращает kafka.ReaderStats, в котором есть Lag int64 — текущий lag суммарно по группе. Читается в отдельной горутине, не в poll-цикле (чтобы не блокировать fetch):
// infra/kafka/lag_exporter.go
type LagExporter struct {
reader *kafka.Reader
topic string
groupID string
}
func (e *LagExporter) Run(ctx context.Context) {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stats := e.reader.Stats()
consumerLag.WithLabelValues(e.topic, e.groupID).
Set(float64(stats.Lag))
}
}
}
LagExporter запускается в main рядом с consumer-горутиной:
// cmd/billing/main.go
go lagExporter.Run(ctx)
go orderConsumer.Run(ctx)
Alert на consumer lag
R-KFK-OBS-2: threshold зависит от критичности топика.
# deploy/alerts/kafka.yml
groups:
- name: kafka
rules:
- alert: KafkaConsumerLagHighCritical
expr: |
kafka_consumer_lag{group=~"billing-.*|payment-.*"} > 1000
for: 5m
labels:
severity: critical
annotations:
summary: "Consumer {{ $labels.group }} lag на {{ $labels.topic }}"
runbook: "https://runbooks.internal/kafka-consumer-lag"
- alert: KafkaConsumerLagHighOrders
expr: |
kafka_consumer_lag{group=~"order-.*"} > 5000
for: 5m
labels:
severity: warning
- alert: KafkaDLQGrowing
expr: |
increase(kafka_dlq_messages_total[1h]) > 10
labels:
severity: critical
annotations:
summary: "DLQ растёт: {{ $labels.topic }}"
Что означает растущий lag:
- Consumer упал (OOM, panic). Смотреть pod logs немедленно.
- Consumer не справляется — throughput упал из-за медленной внешней системы или тяжёлой обработки.
- Rebalance в петле —
MaxWaitили таймаут контекста меньше времени обработки.
Tracing через traceparent (R-KFK-OBS-3)
Distributed trace должен проходить сквозь Kafka: HTTP request → outbox publish → consumer handle. Для этого producer инжектирует текущий OTel-контекст в headers, consumer извлекает и продолжает span.
Producer — inject
// infra/kafka/outbox_relay.go
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
func (r *OutboxRelay) buildMessage(ctx context.Context, e OutboxEvent) kafka.Message {
carrier := propagation.MapCarrier{}
otel.GetTextMapPropagator().Inject(ctx, carrier)
headers := make([]kafka.Header, 0, len(carrier))
for k, v := range carrier {
headers = append(headers, kafka.Header{Key: k, Value: []byte(v)})
}
return kafka.Message{
Topic: topicFor(e.EventType),
Key: []byte(e.AggregateID),
Value: e.Payload,
Headers: headers,
}
}
Consumer — extract и продолжение span
// adapters/in/kafka/order_confirmed_consumer.go
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
func (c *OrderConfirmedConsumer) processMessage(
ctx context.Context,
msg kafka.Message,
) error {
carrier := propagation.MapCarrier{}
for _, h := range msg.Headers {
carrier[h.Key] = string(h.Value)
}
ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)
ctx, span := otel.Tracer("kafka-consumer").Start(
ctx,
"OrderConfirmed.process",
trace.WithSpanKind(trace.SpanKindConsumer),
)
defer span.End()
var evt OrderConfirmedEvent
if err := json.Unmarshal(msg.Value, &evt); err != nil {
span.RecordError(err)
return fmt.Errorf("unmarshal OrderConfirmed: %w", err)
}
span.SetAttributes(
attribute.String("order.id", evt.OrderID),
attribute.String("event.id", evt.EventID),
)
return c.handler.Handle(ctx, evt)
}
После этого в Tempo или Jaeger виден полный trace: HTTP request → outbox INSERT → Kafka publish → consumer handle → DB UPDATE. Debugging distributed flow перестаёт требовать ручной корреляции по времени.
Инициализация OTel
// cmd/billing/main.go
import "go.opentelemetry.io/otel"
import "go.opentelemetry.io/otel/propagation"
func main() {
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
// ... TracerProvider через OTLP exporter
}
DLQ-size alert (R-KFK-OBS-4)
Counter kafka_dlq_messages_total инкрементируется при каждом routing в DLQ (показано выше). Alert:
- alert: KafkaDLQGrowing
expr: increase(kafka_dlq_messages_total[1h]) > 10
labels:
severity: critical
annotations:
summary: "DLQ {{ $labels.topic }} получил >10 сообщений за час"
runbook: "https://runbooks.internal/kafka-dlq-replay"
Replay из DLQ — отдельная admin-операция за auth, не автоматика (R-KFK-RTRY-4).
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Нет alert на kafka_consumer_lag | R-KFK-OBS-X1 | alert per критичность с runbook |
| Lag читается внутри poll-цикла (блокировка) | R-KFK-OBS-1 | отдельная горутина LagExporter с ticker |
| Один порог lag для всех топиков | R-KFK-OBS-2 | money 1000, orders 5000, analytics 100000 |
Нет traceparent в Kafka headers | R-KFK-OBS-3 | OTel propagator inject на producer |
| Consumer не продолжает span из headers | R-KFK-OBS-3 | Extract + tracer.Start с SpanKindConsumer |
| Нет DLQ-size alert | R-KFK-OBS-4 | counter kafka_dlq_messages_total + alert |
processingErrors без метки error_kind | R-KFK-OBS-1 | метка error_kind разделяет transient и poison |
| Ошибки producer не считаются | R-KFK-OBS-1 | counter при WriteMessages error |
Куда дальше
- Конфигурация —
KafkaConfigчерезenvconfig, проверка топиков на старте. - Consumer —
CommitInterval: 0, manual commit,GroupID. - Retry topic + DLQ — retry-горутина вне poll-цикла, DLQ routing.
- Producer —
RequiredAcks: kafka.RequireAll, outbox-relay. - Outbox publishing —
FOR UPDATE SKIP LOCKED, batch relay. - Idempotent consumer —
processed_event,pgx.Tx. - Event design —
EventIDUUID v7, forward-compatible schema. - Security — TLS dialer, per-service
ClientID.