Опирается на правила:
R-KFK-RTRY-1…R-KFK-RTRY-4иR-KFK-RTRY-X1…R-KFK-RTRY-X4из Kafka Style Guide → раздел 5. Retry topic + DLQ.
Важно знать
time.Sleepв основной poll-горутине — антипаттерн: блокируетFetchMessage, Kafka считает consumer мёртвым и запускает rebalance.- Non-blocking retry реализуется через отдельные retry-топики (
*.retry.5s,*.retry.30s) и отдельные горутины-relay с задержкой вне poll-цикла.isTransient(err)— черезerrors.Asпо типу, не по строке сообщения.- Retry только для transient: сетевые ошибки, 5xx от downstream, DB timeout. Не retry: 4xx, validation, nil pointer.
- Poison pill (ошибка десериализации) — сразу в DLQ, без retry.
- Счётчик попыток обязателен: заголовок
X-Retry-Attemptsвkafka.Message.Headers; без лимита — бесконечный loop.- DLQ-monitoring — Prometheus counter + AlertManager правило; без алерта DLQ становится свалкой.
- Replay из DLQ — admin HTTP endpoint за авторизацией, не автоматика.
Kafka-consumer на Go должен переживать временные сбои без потери сообщений и без остановки poll-цикла. В segmentio/kafka-go нет аналога @RetryableTopic — паттерн собирается вручную: основная горутина классифицирует ошибку и публикует сообщение в нужный retry-топик, отдельные relay-горутины обрабатывают их с задержкой.
Топология retry-топиков
R-KFK-RTRY-1: отдельные топики, не блокирующий retry.
orders.confirmed ← основной
orders.confirmed.retry.5s ← retry через 5 с
orders.confirmed.retry.30s ← retry через 30 с
orders.confirmed.dlq ← окончательный fail
Каждый retry-топик обслуживается своей горутиной с time.After(delay) внутри poll-цикла той горутины — не основной. Таким образом основная горутина никогда не засыпает.
Основная consumer-горутина
// adapters/in/kafka/order_confirmed_consumer.go
type OrderConfirmedConsumer struct {
reader *kafka.Reader
handler *OrderConfirmedHandler
retryWriter *kafka.Writer // → orders.confirmed.retry.5s
dlqWriter *kafka.Writer // → orders.confirmed.dlq
}
func (c *OrderConfirmedConsumer) Run(ctx context.Context) error {
for {
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return fmt.Errorf("fetch order confirmed: %w", err)
}
if err := c.process(ctx, msg); err != nil {
slog.ErrorContext(ctx, "order confirmed processing failed",
"offset", msg.Offset,
"error", err,
)
}
if err := c.reader.CommitMessages(ctx, msg); err != nil {
return fmt.Errorf("commit order confirmed: %w", err)
}
}
}
func (c *OrderConfirmedConsumer) process(ctx context.Context, msg kafka.Message) error {
var evt OrderConfirmedEvent
if err := json.Unmarshal(msg.Value, &evt); err != nil {
// poison pill — десериализация не починится ретраем (R-KFK-RTRY-2)
return c.sendToDLQ(ctx, msg, fmt.Errorf("unmarshal: %w", err))
}
if err := c.handler.Handle(ctx, evt); err != nil {
if isTransient(err) {
return c.sendToRetry(ctx, msg) // R-KFK-RTRY-1
}
return c.sendToDLQ(ctx, msg, err) // non-transient → DLQ
}
return nil
}
Commit происходит в любом случае — после успешной обработки, после publish в retry или DLQ. Основная горутина никогда не вызывает time.Sleep.
Retry relay с задержкой
// infra/kafka/retry_relay.go
type RetryRelay struct {
reader *kafka.Reader
nextWriter *kafka.Writer // следующий уровень retry или DLQ
handler *OrderConfirmedHandler
delay time.Duration
maxAttempts int
dlqWriter *kafka.Writer
errorsTotal prometheus.Counter
}
func (r *RetryRelay) Run(ctx context.Context) error {
for {
msg, err := r.reader.FetchMessage(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return fmt.Errorf("retry relay fetch: %w", err)
}
// задержка ВНУТРИ ЭТОЙ горутины, не основной (R-KFK-RTRY-X1)
select {
case <-ctx.Done():
return nil
case <-time.After(r.delay):
}
attempts := parseAttempts(msg.Headers) + 1
if attempts >= r.maxAttempts {
// лимит исчерпан → DLQ (R-KFK-RTRY-X3: без лимита не делать)
if err := writeTo(ctx, r.dlqWriter, withAttempts(msg, attempts)); err != nil {
slog.ErrorContext(ctx, "dlq write failed", "error", err)
}
_ = r.reader.CommitMessages(ctx, msg)
r.errorsTotal.Inc()
continue
}
var evt OrderConfirmedEvent
if err := json.Unmarshal(msg.Value, &evt); err != nil {
_ = writeTo(ctx, r.dlqWriter, withAttempts(msg, attempts))
_ = r.reader.CommitMessages(ctx, msg)
continue
}
if err := r.handler.Handle(ctx, evt); err != nil {
if isTransient(err) {
_ = writeTo(ctx, r.nextWriter, withAttempts(msg, attempts))
} else {
_ = writeTo(ctx, r.dlqWriter, withAttempts(msg, attempts))
}
}
_ = r.reader.CommitMessages(ctx, msg)
}
}
Счётчик попыток хранится в заголовке сообщения:
const headerRetryAttempts = "X-Retry-Attempts"
func parseAttempts(headers []kafka.Header) int {
for _, h := range headers {
if h.Key == headerRetryAttempts {
n, _ := strconv.Atoi(string(h.Value))
return n
}
}
return 0
}
func withAttempts(msg kafka.Message, attempts int) kafka.Message {
headers := make([]kafka.Header, 0, len(msg.Headers)+1)
for _, h := range msg.Headers {
if h.Key != headerRetryAttempts {
headers = append(headers, h)
}
}
headers = append(headers, kafka.Header{
Key: headerRetryAttempts,
Value: []byte(strconv.Itoa(attempts)),
})
return kafka.Message{
Key: msg.Key,
Value: msg.Value,
Headers: headers,
}
}
Классификация ошибок
R-KFK-RTRY-2: transient vs non-transient — через errors.As по типу, не по строке.
// core/order/errors.go
type GatewayError struct {
StatusCode int
Upstream string
Err error
}
func (e *GatewayError) Error() string {
return fmt.Sprintf("gateway %s %d: %v", e.Upstream, e.StatusCode, e.Err)
}
func (e *GatewayError) Unwrap() error { return e.Err }
// adapters/in/kafka/classify.go
func isTransient(err error) bool {
var ge *GatewayError
if errors.As(err, &ge) {
return ge.StatusCode >= 500 // 5xx — временная проблема upstream
}
var te *pgconn.PgError
if errors.As(err, &te) {
return te.Code == "40001" // serialization failure — retry ОК
}
return errors.Is(err, context.DeadlineExceeded)
}
| Тип ошибки | Retry? | Причина |
|---|---|---|
*GatewayError с StatusCode >= 500 | Да | upstream временно недоступен |
context.DeadlineExceeded | Да | таймаут, восстановится |
*pgconn.PgError код 40001 | Да | serialization failure |
*GatewayError с StatusCode < 500 | Нет | контракт нарушен, retry не поможет |
ошибка json.Unmarshal | Нет | poison pill, данные невалидны |
| ошибка валидации бизнес-правила | Нет | бизнес-инвариант нарушен, не временно |
Пример в handler:
// adapters/out/payment/client.go
func (c *PaymentClient) Charge(ctx context.Context, orderID string, amount int64) error {
resp, err := c.http.Post(ctx, "/charges", ChargeRequest{OrderID: orderID, Amount: amount})
if err != nil {
return &GatewayError{Upstream: "payment", StatusCode: 0, Err: err}
}
if resp.StatusCode >= 500 {
return &GatewayError{Upstream: "payment", StatusCode: resp.StatusCode, Err: errors.New(resp.Status)}
}
if resp.StatusCode >= 400 {
return &GatewayError{Upstream: "payment", StatusCode: resp.StatusCode, Err: errors.New(resp.Status)}
}
return nil
}
DLQ monitoring
R-KFK-RTRY-3: alert на размер DLQ.
Счётчик в relay-горутине:
var dlqMessagesTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kafka_dlq_messages_total",
Help: "Messages sent to DLQ",
}, []string{"topic"})
// при отправке в DLQ:
dlqMessagesTotal.WithLabelValues("orders.confirmed.dlq").Inc()
AlertManager правило:
- alert: KafkaDlqBacklog
expr: increase(kafka_dlq_messages_total{topic=~".*\\.dlq"}[1h]) > 10
for: 5m
annotations:
summary: "DLQ {{ $labels.topic }} получил > 10 сообщений за час"
runbook: "https://runbooks.internal/kafka-dlq"
Без алерта DLQ превращается в свалку: события накапливаются незаметно, каждое — непроведённая бизнес-операция. Для денежных операций порог > 1 — одно событие в DLQ уже инцидент.
Replay из DLQ
R-KFK-RTRY-4: ручная операция через admin endpoint.
// adapters/in/http/admin_dlq_handler.go
type DLQReplayHandler struct {
dlqReader *kafka.Reader
mainWriter *kafka.Writer
}
func (h *DLQReplayHandler) Replay(w http.ResponseWriter, r *http.Request) {
topic := chi.URLParam(r, "topic")
limitStr := r.URL.Query().Get("limit")
limit, _ := strconv.Atoi(limitStr)
if limit <= 0 || limit > 100 {
limit = 10
}
replayed := 0
for replayed < limit {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
msg, err := h.dlqReader.FetchMessage(ctx)
cancel()
if err != nil {
break
}
if err := writeTo(r.Context(), h.mainWriter, msg); err != nil {
slog.ErrorContext(r.Context(), "replay write failed", "topic", topic, "error", err)
break
}
_ = h.dlqReader.CommitMessages(r.Context(), msg)
replayed++
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]int{"replayed": replayed})
}
Replay не автоматический — bug в коде обработки отправит событие обратно в DLQ. Ops-команда смотрит на событие, исправляет причину, только потом реплеит.
Запуск горутин в main
// main.go
func run(ctx context.Context, cfg Config) error {
// reader конфиги
mainReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Kafka.Brokers,
Topic: cfg.Kafka.Topics.OrdersConfirmed,
GroupID: "billing-order-confirmed",
CommitInterval: 0,
StartOffset: kafka.FirstOffset,
})
retry5sReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Kafka.Brokers,
Topic: cfg.Kafka.Topics.OrdersConfirmedRetry5s,
GroupID: "billing-order-confirmed-retry-5s",
CommitInterval: 0,
StartOffset: kafka.FirstOffset,
})
// writer'ы
retry5sWriter := newWriter(cfg.Kafka, cfg.Kafka.Topics.OrdersConfirmedRetry5s)
retry30sWriter := newWriter(cfg.Kafka, cfg.Kafka.Topics.OrdersConfirmedRetry30s)
dlqWriter := newWriter(cfg.Kafka, cfg.Kafka.Topics.OrdersConfirmedDLQ)
handler := buildOrderConfirmedHandler(cfg)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
c := &OrderConfirmedConsumer{
reader: mainReader,
handler: handler,
retryWriter: retry5sWriter,
dlqWriter: dlqWriter,
}
return c.Run(ctx)
})
g.Go(func() error {
r := &RetryRelay{
reader: retry5sReader,
nextWriter: retry30sWriter,
handler: handler,
delay: 5 * time.Second,
maxAttempts: 3,
dlqWriter: dlqWriter,
}
return r.Run(ctx)
})
// аналогично для retry.30s → dlq
return g.Wait()
}
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
time.Sleep в основной poll-горутине | R-KFK-RTRY-X1 | отдельная relay-горутина с задержкой |
if err != nil { slog.Error(...); /* commit */ } | R-KFK-RTRY-X2 | publish в retry или DLQ, затем commit |
| retry-топик без счётчика попыток | R-KFK-RTRY-X3 | заголовок X-Retry-Attempts + maxAttempts |
| DLQ без алерта | R-KFK-RTRY-X4 | kafka_dlq_messages_total + AlertManager |
isTransient по строке ошибки | R-KFK-RTRY-2 | errors.As по типу |
| auto-replay из DLQ | R-KFK-RTRY-4 | admin endpoint за авторизацией |
| общий retry-топик для всех listener-ов сервиса | R-KFK-RTRY-1 | per-listener retry топики |
Куда дальше
- Consumer — почему
time.Sleepв poll-горутине ломает consumer. - Idempotent consumer — replay из DLQ = дубль; как защититься через
processed_event. - Observability — consumer lag, DLQ size alerts,
promauto. - Event design —
EventIDв payload; poison pill и схема сообщения. - Конфигурация —
KafkaConfigчерезenvconfig; retry-топики какrequired:"true". - Producer —
RequiredAcks: kafka.RequireAll; ключ partition для ordering. - Outbox publishing — почему domain-событие не публикуется напрямую из handler.
- Security — TLS-dialer; per-service
ClientIDдля ACL.