Опирается на правила: R-KFK-OBS-1R-KFK-OBS-4 и R-KFK-OBS-X1 из Kafka Style Guide → раздел 8. Observability.

Важно знать

  • segmentio/kafka-go не встраивается в Micrometer — метрики регистрируются вручную через promauto.
  • kafka_consumer_lag — главный health-сигнал; растущий lag означает, что consumer не справляется или упал.
  • Alert на lag > N для критичных топиков в течение 5 минут — инцидент (R-KFK-OBS-2).
  • traceparent передаётся в Kafka headers через OTel propagator; consumer извлекает и продолжает span (R-KFK-OBS-3).
  • DLQ-size alert обязателен — каждое сообщение в DLQ требует разбора (R-KFK-OBS-4).
  • Lag экспонирует kafka.Reader.Stats() — читается в отдельной горутине, не в poll-цикле.
  • Без consumer-lag alerts пропадание сообщений обнаруживается по жалобам пользователей (R-KFK-OBS-X1).
  • processing_errors_total с меткой error_kind позволяет разделить transient и poison-pill.

Kafka-пайплайн без observability — чёрный ящик: producer публикует, consumer обрабатывает, но что между ними и насколько быстро — никто не видит. Consumer lag и DLQ-size — два маркера, по которым команда понимает «жив ли pipeline»; без них инцидент обнаруживается через жалобы.

Регистрация метрик (R-KFK-OBS-1)

segmentio/kafka-go не имеет встроенной Micrometer-интеграции. Метрики регистрируются через promauto при инициализации writer/reader:

// infra/kafka/metrics.go
package kafka

import "github.com/prometheus/client_golang/prometheus/promauto"
import "github.com/prometheus/client_golang/prometheus"

var (
    messagesProduced = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "kafka_messages_produced_total",
        Help: "Total Kafka messages produced",
    }, []string{"topic"})

    messagesConsumed = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "kafka_messages_consumed_total",
        Help: "Total Kafka messages consumed",
    }, []string{"topic", "group"})

    processingErrors = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "kafka_processing_errors_total",
        Help: "Kafka consumer processing errors",
    }, []string{"topic", "group", "error_kind"})

    dlqMessages = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "kafka_dlq_messages_total",
        Help: "Messages routed to DLQ",
    }, []string{"topic"})

    consumerLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
        Name: "kafka_consumer_lag",
        Help: "Current consumer lag per topic/group",
    }, []string{"topic", "group"})

    processingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
        Name:    "kafka_processing_duration_seconds",
        Help:    "Message processing duration",
        Buckets: prometheus.DefBuckets,
    }, []string{"topic", "group"})
)

Инкремент в consumer-цикле

Счётчики обновляются непосредственно в Run-горутине:

// adapters/in/kafka/order_confirmed_consumer.go
func (c *OrderConfirmedConsumer) Run(ctx context.Context) error {
    for {
        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            if errors.Is(err, context.Canceled) {
                return nil
            }
            return fmt.Errorf("fetch: %w", err)
        }

        start := time.Now()
        var evt OrderConfirmedEvent
        if err := json.Unmarshal(msg.Value, &evt); err != nil {
            processingErrors.WithLabelValues(
                c.topic, c.groupID, "unmarshal",
            ).Inc()
            dlqMessages.WithLabelValues(c.topic).Inc()
            c.sendToDLQ(ctx, msg, err)
            _ = c.reader.CommitMessages(ctx, msg)
            continue
        }

        if err := c.handler.Handle(ctx, evt); err != nil {
            kind := errorKind(err)
            processingErrors.WithLabelValues(c.topic, c.groupID, kind).Inc()
            if isTransient(err) {
                c.sendToRetry(ctx, msg)
            } else {
                dlqMessages.WithLabelValues(c.topic).Inc()
                c.sendToDLQ(ctx, msg, err)
            }
        } else {
            messagesConsumed.WithLabelValues(c.topic, c.groupID).Inc()
        }

        processingDuration.WithLabelValues(c.topic, c.groupID).
            Observe(time.Since(start).Seconds())

        if err := c.reader.CommitMessages(ctx, msg); err != nil {
            return fmt.Errorf("commit: %w", err)
        }
    }
}

func errorKind(err error) string {
    var ge *GatewayError
    if errors.As(err, &ge) {
        return "gateway"
    }
    return "internal"
}

Инкремент на producer

// infra/kafka/outbox_relay.go — после WriteMessages
if err := r.writer.WriteMessages(ctx, msg); err != nil {
    return fmt.Errorf("publish %s: %w", e.EventType, err)
}
messagesProduced.WithLabelValues(topicFor(e.EventType)).Inc()

Consumer lag (R-KFK-OBS-1, R-KFK-OBS-2)

kafka.Reader.Stats() возвращает kafka.ReaderStats, в котором есть Lag int64 — текущий lag суммарно по группе. Читается в отдельной горутине, не в poll-цикле (чтобы не блокировать fetch):

// infra/kafka/lag_exporter.go
type LagExporter struct {
    reader  *kafka.Reader
    topic   string
    groupID string
}

func (e *LagExporter) Run(ctx context.Context) {
    ticker := time.NewTicker(15 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            stats := e.reader.Stats()
            consumerLag.WithLabelValues(e.topic, e.groupID).
                Set(float64(stats.Lag))
        }
    }
}

LagExporter запускается в main рядом с consumer-горутиной:

// cmd/billing/main.go
go lagExporter.Run(ctx)
go orderConsumer.Run(ctx)

Alert на consumer lag

R-KFK-OBS-2: threshold зависит от критичности топика.

# deploy/alerts/kafka.yml
groups:
  - name: kafka
    rules:
      - alert: KafkaConsumerLagHighCritical
        expr: |
          kafka_consumer_lag{group=~"billing-.*|payment-.*"} > 1000
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Consumer {{ $labels.group }} lag на {{ $labels.topic }}"
          runbook: "https://runbooks.internal/kafka-consumer-lag"

      - alert: KafkaConsumerLagHighOrders
        expr: |
          kafka_consumer_lag{group=~"order-.*"} > 5000
        for: 5m
        labels:
          severity: warning

      - alert: KafkaDLQGrowing
        expr: |
          increase(kafka_dlq_messages_total[1h]) > 10
        labels:
          severity: critical
        annotations:
          summary: "DLQ растёт: {{ $labels.topic }}"

Что означает растущий lag:

  • Consumer упал (OOM, panic). Смотреть pod logs немедленно.
  • Consumer не справляется — throughput упал из-за медленной внешней системы или тяжёлой обработки.
  • Rebalance в петлеMaxWait или таймаут контекста меньше времени обработки.

Tracing через traceparent (R-KFK-OBS-3)

Distributed trace должен проходить сквозь Kafka: HTTP request → outbox publish → consumer handle. Для этого producer инжектирует текущий OTel-контекст в headers, consumer извлекает и продолжает span.

Producer — inject

// infra/kafka/outbox_relay.go
import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/propagation"
)

func (r *OutboxRelay) buildMessage(ctx context.Context, e OutboxEvent) kafka.Message {
    carrier := propagation.MapCarrier{}
    otel.GetTextMapPropagator().Inject(ctx, carrier)

    headers := make([]kafka.Header, 0, len(carrier))
    for k, v := range carrier {
        headers = append(headers, kafka.Header{Key: k, Value: []byte(v)})
    }
    return kafka.Message{
        Topic:   topicFor(e.EventType),
        Key:     []byte(e.AggregateID),
        Value:   e.Payload,
        Headers: headers,
    }
}

Consumer — extract и продолжение span

// adapters/in/kafka/order_confirmed_consumer.go
import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/trace"
)

func (c *OrderConfirmedConsumer) processMessage(
    ctx context.Context,
    msg kafka.Message,
) error {
    carrier := propagation.MapCarrier{}
    for _, h := range msg.Headers {
        carrier[h.Key] = string(h.Value)
    }
    ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)

    ctx, span := otel.Tracer("kafka-consumer").Start(
        ctx,
        "OrderConfirmed.process",
        trace.WithSpanKind(trace.SpanKindConsumer),
    )
    defer span.End()

    var evt OrderConfirmedEvent
    if err := json.Unmarshal(msg.Value, &evt); err != nil {
        span.RecordError(err)
        return fmt.Errorf("unmarshal OrderConfirmed: %w", err)
    }

    span.SetAttributes(
        attribute.String("order.id", evt.OrderID),
        attribute.String("event.id", evt.EventID),
    )

    return c.handler.Handle(ctx, evt)
}

После этого в Tempo или Jaeger виден полный trace: HTTP request → outbox INSERT → Kafka publish → consumer handle → DB UPDATE. Debugging distributed flow перестаёт требовать ручной корреляции по времени.

Инициализация OTel

// cmd/billing/main.go
import "go.opentelemetry.io/otel"
import "go.opentelemetry.io/otel/propagation"

func main() {
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
        propagation.TraceContext{},
        propagation.Baggage{},
    ))
    // ... TracerProvider через OTLP exporter
}

DLQ-size alert (R-KFK-OBS-4)

Counter kafka_dlq_messages_total инкрементируется при каждом routing в DLQ (показано выше). Alert:

- alert: KafkaDLQGrowing
  expr: increase(kafka_dlq_messages_total[1h]) > 10
  labels:
    severity: critical
  annotations:
    summary: "DLQ {{ $labels.topic }} получил >10 сообщений за час"
    runbook: "https://runbooks.internal/kafka-dlq-replay"

Replay из DLQ — отдельная admin-операция за auth, не автоматика (R-KFK-RTRY-4).

Что запрещено

АнтипаттернПравилоЧто взамен
Нет alert на kafka_consumer_lagR-KFK-OBS-X1alert per критичность с runbook
Lag читается внутри poll-цикла (блокировка)R-KFK-OBS-1отдельная горутина LagExporter с ticker
Один порог lag для всех топиковR-KFK-OBS-2money 1000, orders 5000, analytics 100000
Нет traceparent в Kafka headersR-KFK-OBS-3OTel propagator inject на producer
Consumer не продолжает span из headersR-KFK-OBS-3Extract + tracer.Start с SpanKindConsumer
Нет DLQ-size alertR-KFK-OBS-4counter kafka_dlq_messages_total + alert
processingErrors без метки error_kindR-KFK-OBS-1метка error_kind разделяет transient и poison
Ошибки producer не считаютсяR-KFK-OBS-1counter при WriteMessages error

Куда дальше

  • Конфигурация — KafkaConfig через envconfig, проверка топиков на старте.
  • Consumer — CommitInterval: 0, manual commit, GroupID.
  • Retry topic + DLQ — retry-горутина вне poll-цикла, DLQ routing.
  • Producer — RequiredAcks: kafka.RequireAll, outbox-relay.
  • Outbox publishing — FOR UPDATE SKIP LOCKED, batch relay.
  • Idempotent consumer — processed_event, pgx.Tx.
  • Event design — EventID UUID v7, forward-compatible schema.
  • Security — TLS dialer, per-service ClientID.