Опирается на правила: R-KFK-RTRY-1R-KFK-RTRY-4 и R-KFK-RTRY-X1R-KFK-RTRY-X4 из Kafka Style Guide → раздел 5. Retry topic + DLQ.

Важно знать

  • time.Sleep в основной poll-горутине — антипаттерн: блокирует FetchMessage, Kafka считает consumer мёртвым и запускает rebalance.
  • Non-blocking retry реализуется через отдельные retry-топики (*.retry.5s, *.retry.30s) и отдельные горутины-relay с задержкой вне poll-цикла.
  • isTransient(err) — через errors.As по типу, не по строке сообщения.
  • Retry только для transient: сетевые ошибки, 5xx от downstream, DB timeout. Не retry: 4xx, validation, nil pointer.
  • Poison pill (ошибка десериализации) — сразу в DLQ, без retry.
  • Счётчик попыток обязателен: заголовок X-Retry-Attempts в kafka.Message.Headers; без лимита — бесконечный loop.
  • DLQ-monitoring — Prometheus counter + AlertManager правило; без алерта DLQ становится свалкой.
  • Replay из DLQ — admin HTTP endpoint за авторизацией, не автоматика.

Kafka-consumer на Go должен переживать временные сбои без потери сообщений и без остановки poll-цикла. В segmentio/kafka-go нет аналога @RetryableTopic — паттерн собирается вручную: основная горутина классифицирует ошибку и публикует сообщение в нужный retry-топик, отдельные relay-горутины обрабатывают их с задержкой.

Топология retry-топиков

R-KFK-RTRY-1: отдельные топики, не блокирующий retry.

orders.confirmed              ← основной
orders.confirmed.retry.5s     ← retry через 5 с
orders.confirmed.retry.30s    ← retry через 30 с
orders.confirmed.dlq          ← окончательный fail

Каждый retry-топик обслуживается своей горутиной с time.After(delay) внутри poll-цикла той горутины — не основной. Таким образом основная горутина никогда не засыпает.

Основная consumer-горутина

// adapters/in/kafka/order_confirmed_consumer.go

type OrderConfirmedConsumer struct {
    reader    *kafka.Reader
    handler   *OrderConfirmedHandler
    retryWriter *kafka.Writer  // → orders.confirmed.retry.5s
    dlqWriter   *kafka.Writer  // → orders.confirmed.dlq
}

func (c *OrderConfirmedConsumer) Run(ctx context.Context) error {
    for {
        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            if errors.Is(err, context.Canceled) {
                return nil
            }
            return fmt.Errorf("fetch order confirmed: %w", err)
        }

        if err := c.process(ctx, msg); err != nil {
            slog.ErrorContext(ctx, "order confirmed processing failed",
                "offset", msg.Offset,
                "error", err,
            )
        }

        if err := c.reader.CommitMessages(ctx, msg); err != nil {
            return fmt.Errorf("commit order confirmed: %w", err)
        }
    }
}

func (c *OrderConfirmedConsumer) process(ctx context.Context, msg kafka.Message) error {
    var evt OrderConfirmedEvent
    if err := json.Unmarshal(msg.Value, &evt); err != nil {
        // poison pill — десериализация не починится ретраем (R-KFK-RTRY-2)
        return c.sendToDLQ(ctx, msg, fmt.Errorf("unmarshal: %w", err))
    }

    if err := c.handler.Handle(ctx, evt); err != nil {
        if isTransient(err) {
            return c.sendToRetry(ctx, msg)   // R-KFK-RTRY-1
        }
        return c.sendToDLQ(ctx, msg, err)   // non-transient → DLQ
    }
    return nil
}

Commit происходит в любом случае — после успешной обработки, после publish в retry или DLQ. Основная горутина никогда не вызывает time.Sleep.

Retry relay с задержкой

// infra/kafka/retry_relay.go

type RetryRelay struct {
    reader          *kafka.Reader
    nextWriter      *kafka.Writer // следующий уровень retry или DLQ
    handler         *OrderConfirmedHandler
    delay           time.Duration
    maxAttempts     int
    dlqWriter       *kafka.Writer
    errorsTotal     prometheus.Counter
}

func (r *RetryRelay) Run(ctx context.Context) error {
    for {
        msg, err := r.reader.FetchMessage(ctx)
        if err != nil {
            if errors.Is(err, context.Canceled) {
                return nil
            }
            return fmt.Errorf("retry relay fetch: %w", err)
        }

        // задержка ВНУТРИ ЭТОЙ горутины, не основной (R-KFK-RTRY-X1)
        select {
        case <-ctx.Done():
            return nil
        case <-time.After(r.delay):
        }

        attempts := parseAttempts(msg.Headers) + 1
        if attempts >= r.maxAttempts {
            // лимит исчерпан → DLQ (R-KFK-RTRY-X3: без лимита не делать)
            if err := writeTo(ctx, r.dlqWriter, withAttempts(msg, attempts)); err != nil {
                slog.ErrorContext(ctx, "dlq write failed", "error", err)
            }
            _ = r.reader.CommitMessages(ctx, msg)
            r.errorsTotal.Inc()
            continue
        }

        var evt OrderConfirmedEvent
        if err := json.Unmarshal(msg.Value, &evt); err != nil {
            _ = writeTo(ctx, r.dlqWriter, withAttempts(msg, attempts))
            _ = r.reader.CommitMessages(ctx, msg)
            continue
        }

        if err := r.handler.Handle(ctx, evt); err != nil {
            if isTransient(err) {
                _ = writeTo(ctx, r.nextWriter, withAttempts(msg, attempts))
            } else {
                _ = writeTo(ctx, r.dlqWriter, withAttempts(msg, attempts))
            }
        }
        _ = r.reader.CommitMessages(ctx, msg)
    }
}

Счётчик попыток хранится в заголовке сообщения:

const headerRetryAttempts = "X-Retry-Attempts"

func parseAttempts(headers []kafka.Header) int {
    for _, h := range headers {
        if h.Key == headerRetryAttempts {
            n, _ := strconv.Atoi(string(h.Value))
            return n
        }
    }
    return 0
}

func withAttempts(msg kafka.Message, attempts int) kafka.Message {
    headers := make([]kafka.Header, 0, len(msg.Headers)+1)
    for _, h := range msg.Headers {
        if h.Key != headerRetryAttempts {
            headers = append(headers, h)
        }
    }
    headers = append(headers, kafka.Header{
        Key:   headerRetryAttempts,
        Value: []byte(strconv.Itoa(attempts)),
    })
    return kafka.Message{
        Key:     msg.Key,
        Value:   msg.Value,
        Headers: headers,
    }
}

Классификация ошибок

R-KFK-RTRY-2: transient vs non-transient — через errors.As по типу, не по строке.

// core/order/errors.go

type GatewayError struct {
    StatusCode int
    Upstream   string
    Err        error
}

func (e *GatewayError) Error() string {
    return fmt.Sprintf("gateway %s %d: %v", e.Upstream, e.StatusCode, e.Err)
}

func (e *GatewayError) Unwrap() error { return e.Err }
// adapters/in/kafka/classify.go

func isTransient(err error) bool {
    var ge *GatewayError
    if errors.As(err, &ge) {
        return ge.StatusCode >= 500 // 5xx — временная проблема upstream
    }
    var te *pgconn.PgError
    if errors.As(err, &te) {
        return te.Code == "40001" // serialization failure — retry ОК
    }
    return errors.Is(err, context.DeadlineExceeded)
}
Тип ошибкиRetry?Причина
*GatewayError с StatusCode >= 500Даupstream временно недоступен
context.DeadlineExceededДатаймаут, восстановится
*pgconn.PgError код 40001Даserialization failure
*GatewayError с StatusCode < 500Нетконтракт нарушен, retry не поможет
ошибка json.UnmarshalНетpoison pill, данные невалидны
ошибка валидации бизнес-правилаНетбизнес-инвариант нарушен, не временно

Пример в handler:

// adapters/out/payment/client.go

func (c *PaymentClient) Charge(ctx context.Context, orderID string, amount int64) error {
    resp, err := c.http.Post(ctx, "/charges", ChargeRequest{OrderID: orderID, Amount: amount})
    if err != nil {
        return &GatewayError{Upstream: "payment", StatusCode: 0, Err: err}
    }
    if resp.StatusCode >= 500 {
        return &GatewayError{Upstream: "payment", StatusCode: resp.StatusCode, Err: errors.New(resp.Status)}
    }
    if resp.StatusCode >= 400 {
        return &GatewayError{Upstream: "payment", StatusCode: resp.StatusCode, Err: errors.New(resp.Status)}
    }
    return nil
}

DLQ monitoring

R-KFK-RTRY-3: alert на размер DLQ.

Счётчик в relay-горутине:

var dlqMessagesTotal = promauto.NewCounterVec(prometheus.CounterOpts{
    Name: "kafka_dlq_messages_total",
    Help: "Messages sent to DLQ",
}, []string{"topic"})

// при отправке в DLQ:
dlqMessagesTotal.WithLabelValues("orders.confirmed.dlq").Inc()

AlertManager правило:

- alert: KafkaDlqBacklog
  expr: increase(kafka_dlq_messages_total{topic=~".*\\.dlq"}[1h]) > 10
  for: 5m
  annotations:
    summary: "DLQ {{ $labels.topic }} получил > 10 сообщений за час"
    runbook: "https://runbooks.internal/kafka-dlq"

Без алерта DLQ превращается в свалку: события накапливаются незаметно, каждое — непроведённая бизнес-операция. Для денежных операций порог > 1 — одно событие в DLQ уже инцидент.

Replay из DLQ

R-KFK-RTRY-4: ручная операция через admin endpoint.

// adapters/in/http/admin_dlq_handler.go

type DLQReplayHandler struct {
    dlqReader   *kafka.Reader
    mainWriter  *kafka.Writer
}

func (h *DLQReplayHandler) Replay(w http.ResponseWriter, r *http.Request) {
    topic := chi.URLParam(r, "topic")
    limitStr := r.URL.Query().Get("limit")
    limit, _ := strconv.Atoi(limitStr)
    if limit <= 0 || limit > 100 {
        limit = 10
    }

    replayed := 0
    for replayed < limit {
        ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
        msg, err := h.dlqReader.FetchMessage(ctx)
        cancel()
        if err != nil {
            break
        }

        if err := writeTo(r.Context(), h.mainWriter, msg); err != nil {
            slog.ErrorContext(r.Context(), "replay write failed", "topic", topic, "error", err)
            break
        }
        _ = h.dlqReader.CommitMessages(r.Context(), msg)
        replayed++
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(map[string]int{"replayed": replayed})
}

Replay не автоматический — bug в коде обработки отправит событие обратно в DLQ. Ops-команда смотрит на событие, исправляет причину, только потом реплеит.

Запуск горутин в main

// main.go

func run(ctx context.Context, cfg Config) error {
    // reader конфиги
    mainReader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:        cfg.Kafka.Brokers,
        Topic:          cfg.Kafka.Topics.OrdersConfirmed,
        GroupID:        "billing-order-confirmed",
        CommitInterval: 0,
        StartOffset:    kafka.FirstOffset,
    })
    retry5sReader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:        cfg.Kafka.Brokers,
        Topic:          cfg.Kafka.Topics.OrdersConfirmedRetry5s,
        GroupID:        "billing-order-confirmed-retry-5s",
        CommitInterval: 0,
        StartOffset:    kafka.FirstOffset,
    })

    // writer'ы
    retry5sWriter := newWriter(cfg.Kafka, cfg.Kafka.Topics.OrdersConfirmedRetry5s)
    retry30sWriter := newWriter(cfg.Kafka, cfg.Kafka.Topics.OrdersConfirmedRetry30s)
    dlqWriter := newWriter(cfg.Kafka, cfg.Kafka.Topics.OrdersConfirmedDLQ)

    handler := buildOrderConfirmedHandler(cfg)

    g, ctx := errgroup.WithContext(ctx)

    g.Go(func() error {
        c := &OrderConfirmedConsumer{
            reader:      mainReader,
            handler:     handler,
            retryWriter: retry5sWriter,
            dlqWriter:   dlqWriter,
        }
        return c.Run(ctx)
    })

    g.Go(func() error {
        r := &RetryRelay{
            reader:      retry5sReader,
            nextWriter:  retry30sWriter,
            handler:     handler,
            delay:       5 * time.Second,
            maxAttempts: 3,
            dlqWriter:   dlqWriter,
        }
        return r.Run(ctx)
    })

    // аналогично для retry.30s → dlq

    return g.Wait()
}

Что запрещено

АнтипаттернПравилоЧто взамен
time.Sleep в основной poll-горутинеR-KFK-RTRY-X1отдельная relay-горутина с задержкой
if err != nil { slog.Error(...); /* commit */ }R-KFK-RTRY-X2publish в retry или DLQ, затем commit
retry-топик без счётчика попытокR-KFK-RTRY-X3заголовок X-Retry-Attempts + maxAttempts
DLQ без алертаR-KFK-RTRY-X4kafka_dlq_messages_total + AlertManager
isTransient по строке ошибкиR-KFK-RTRY-2errors.As по типу
auto-replay из DLQR-KFK-RTRY-4admin endpoint за авторизацией
общий retry-топик для всех listener-ов сервисаR-KFK-RTRY-1per-listener retry топики

Куда дальше

  • Consumer — почему time.Sleep в poll-горутине ломает consumer.
  • Idempotent consumer — replay из DLQ = дубль; как защититься через processed_event.
  • Observability — consumer lag, DLQ size alerts, promauto.
  • Event design — EventID в payload; poison pill и схема сообщения.
  • Конфигурация — KafkaConfig через envconfig; retry-топики как required:"true".
  • Producer — RequiredAcks: kafka.RequireAll; ключ partition для ordering.
  • Outbox publishing — почему domain-событие не публикуется напрямую из handler.
  • Security — TLS-dialer; per-service ClientID для ACL.