Опирается на правила: R-KFK-CONS-1R-KFK-CONS-6 и R-KFK-CONS-X1R-KFK-CONS-X4 из Kafka Style Guide → раздел 2. Consumer.

Важно знать

  • GroupID в формате <service>-<purpose> — один GroupID = одна логическая роль, не общий на весь сервис.
  • CommitInterval: 0 — обязателен; авто-коммит = потеря событий при crash.
  • reader.CommitMessages(ctx, msg) — только после успешной обработки; poison pill сначала в DLQ, потом commit.
  • StartOffset: kafka.FirstOffset — для critical-consumer'ов; kafka.LastOffset пропускает события при новом group.
  • Listener — горутина с for { FetchMessage; handle; CommitMessages } — идиома Go-стека; context.Canceled = штатный выход.
  • Concurrency через N горутин на одном GroupID, количество ≤ числу партиций.
  • Никакого time.Sleep в poll-горутине — блокирует цикл, Kafka считает consumer мёртвым.
  • HTTP к внешней системе из горутины-listener без CB/bulkhead — listener зависает, rebalance, дубликаты.

Consumer — точка, где сервис принимает факт из внешнего мира. В Go-стеке это горутина с явным poll-циклом: kafka.Reader не управляет обработкой автоматически, вся логика ручная. Главные опасности — авто-коммит смещения до завершения обработки и блокировка poll-цикла.

GroupID: одна роль — один группа

R-KFK-CONS-1: формат <service>-<purpose>.

// infra/kafka/reader.go

func NewOrderConfirmedReader(cfg KafkaConfig) *kafka.Reader {
    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:        cfg.Brokers,
        Topic:          cfg.Topics.OrdersConfirmed,
        GroupID:        "billing-order-confirmed",   // <service>-<purpose>
        MinBytes:       1,
        MaxBytes:       10 << 20,
        CommitInterval: 0,                           // manual commit
        StartOffset:    kafka.FirstOffset,           // earliest для critical
    })
}

func NewProductCreatedReader(cfg KafkaConfig) *kafka.Reader {
    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:     cfg.Brokers,
        Topic:       cfg.Topics.ProductsCreated,
        GroupID:     "inventory-product-created",    // отдельный group — отдельный offset
        CommitInterval: 0,
        StartOffset:    kafka.FirstOffset,
    })
}

Два listener-а в одном сервисе — два разных GroupID. Если использовать общий GroupID: "order-service", Kafka распределяет партиции между consumer-instance-ами всей группы: ребалансировка одного listener-а задевает другой, независимые offset-ы становятся общими.

Manual commit: CommitInterval = 0

R-KFK-CONS-2: CommitInterval: 0 отключает авто-коммит. Commit — явным reader.CommitMessages(ctx, msg) после успешной обработки.

// adapters/in/kafka/order_confirmed_consumer.go

type OrderConfirmedConsumer struct {
    reader  *kafka.Reader
    handler *OrderConfirmedHandler
    dlq     *kafka.Writer
}

func (c *OrderConfirmedConsumer) Run(ctx context.Context) error {
    for {
        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            if errors.Is(err, context.Canceled) {
                return nil
            }
            return fmt.Errorf("fetch OrderConfirmed: %w", err)
        }

        var evt OrderConfirmedEvent
        if err := json.Unmarshal(msg.Value, &evt); err != nil {
            c.sendToDLQ(ctx, msg, err)
            if err := c.reader.CommitMessages(ctx, msg); err != nil {
                return fmt.Errorf("commit after DLQ (poison): %w", err)
            }
            continue
        }

        if err := c.handler.Handle(ctx, evt); err != nil {
            if isTransient(err) {
                c.sendToRetry(ctx, msg)
            } else {
                c.sendToDLQ(ctx, msg, err)
            }
        }

        if err := c.reader.CommitMessages(ctx, msg); err != nil {
            return fmt.Errorf("commit OrderConfirmed %s: %w", evt.OrderID, err)
        }
    }
}

Последовательность: FetchMessageUnmarshalHandleCommitMessages. Коммит всегда последний — даже для poison pill, когда сообщение идёт в DLQ: мы признаём факт его получения, но не пытаемся обрабатывать снова.

Если Handle вернул transient-ошибку — отправляем в retry-топик, после чего всё равно коммитим: дальнейшая обработка произойдёт через retry-горутину с задержкой вне poll-цикла.

Идемпотентность: listener обязан переносить дубли

R-KFK-CONS-3: at-least-once delivery — норма Kafka. Дублирование происходит при rebalance до CommitMessages, при DLQ replay, при перезапуске сервиса.

// adapters/in/kafka/order_confirmed_handler.go

func (h *OrderConfirmedHandler) Handle(ctx context.Context, evt OrderConfirmedEvent) error {
    tx, err := h.db.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin: %w", err)
    }
    defer tx.Rollback(ctx)

    inserted, err := h.dedup.TryInsert(ctx, tx, evt.EventID)
    if err != nil {
        return fmt.Errorf("dedup check: %w", err)
    }
    if !inserted {
        return nil
    }

    if err := h.billing.ApplyOrderConfirmed(ctx, tx, evt); err != nil {
        return err
    }
    return tx.Commit(ctx)
}

TryInsert вставляет event_id в таблицу processed_event с PRIMARY KEY — duplicate key = уже обработано, выходим без ошибки. Бизнес-результат и запись в processed_event в одной транзакции (R-KFK-IDEM-3). Подробнее — Idempotent consumer.

StartOffset: kafka.FirstOffset для critical-топиков

R-KFK-CONS-4: StartOffset: kafka.FirstOffset эквивалентен auto.offset.reset=earliest.

func NewCustomerCreatedReader(cfg KafkaConfig) *kafka.Reader {
    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:        cfg.Brokers,
        Topic:          cfg.Topics.CustomersCreated,
        GroupID:        "crm-customer-created",
        CommitInterval: 0,
        StartOffset:    kafka.FirstOffset,   // начинаем с самого старого
    })
}

kafka.FirstOffset срабатывает только когда у GroupID нет сохранённого offset-а — при первом запуске или после сброса. Для action-топиков (orders, payments, customers) — обязательно: kafka.LastOffset при новом consumer-group пропустит все события, существовавшие до первого poll.

Для аналитических топиков, где старые данные не нужны, kafka.LastOffset приемлем — но это осознанное исключение.

Concurrency через горутины

R-KFK-CONS-5: параллелизм в Go-стеке — N горутин с отдельными kafka.Reader на одном GroupID; количество ≤ числу партиций.

// main.go или wire.go

func startProductStockConsumers(ctx context.Context, cfg KafkaConfig, h *ProductStockHandler, partitions int) {
    for i := 0; i < partitions; i++ {
        reader := kafka.NewReader(kafka.ReaderConfig{
            Brokers:        cfg.Brokers,
            Topic:          cfg.Topics.ProductStockUpdated,
            GroupID:        "warehouse-product-stock",
            CommitInterval: 0,
            StartOffset:    kafka.FirstOffset,
        })
        consumer := &ProductStockConsumer{reader: reader, handler: h}
        go func() {
            if err := consumer.Run(ctx); err != nil {
                slog.ErrorContext(ctx, "product stock consumer stopped", "error", err)
            }
        }()
    }
}

Kafka назначает каждую партицию ровно одному consumer в группе. Горутин больше, чем партиций — лишние не получат партицию и будут ждать в FetchMessage без работы.

MaxWait и контекст вместо таймаута

R-KFK-CONS-6: MaxWait — максимальное время ожидания данных за одним FetchMessage. Не путать с таймаутом обработки — это не одно и то же.

func NewSberPaymentReader(cfg KafkaConfig) *kafka.Reader {
    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:        cfg.Brokers,
        Topic:          cfg.Topics.SberPaymentEvents,
        GroupID:        "payment-sber-events",
        CommitInterval: 0,
        StartOffset:    kafka.FirstOffset,
        MaxWait:        500 * time.Millisecond,
    })
}

Если обработка одного события занимает несколько секунд (например, DB-запрос + downstream call) — нужно следить, чтобы ctx с дедлайном не истекал до CommitMessages. Не добавляйте context.WithTimeout с коротким дедлайном в poll-горутине: истёкший контекст прервёт CommitMessages, и offset не будет зафиксирован.

Запреты

CommitInterval > 0 (авто-коммит)

R-KFK-CONS-X1: CommitInterval > 0 означает, что kafka-go коммитит offset по таймеру — независимо от того, завершилась ли обработка.

Сценарий: consumer получил 100 сообщений, обрабатывает медленно. Через 1 секунду (при CommitInterval: time.Second) offset за все 100 закоммичен — но обработано только 15. Crash → restart → consumer читает с 101. 85 событий OrderConfirmed потеряны, биллинг не выставлен.

time.Sleep в poll-горутине

R-KFK-CONS-X2: time.Sleep в теле Run блокирует poll-цикл. Kafka не получает heartbeat → считает consumer dead → rebalance → partition уходит другому consumer → тот же вызов CommitMessages при возврате упадёт с ошибкой → дубликаты.

// НЕПРАВИЛЬНО
func (c *OrderConsumer) Run(ctx context.Context) error {
    for {
        msg, _ := c.reader.FetchMessage(ctx)
        for i := 0; i < 3; i++ {
            if err := c.handler.Handle(ctx, msg); err == nil {
                break
            }
            time.Sleep(30 * time.Second) // блокирует poll-цикл
        }
        c.reader.CommitMessages(ctx, msg)
    }
}

Retry с задержкой — через отдельные retry-топики и отдельные горутины-relay с time.After вне poll-цикла. Подробнее — Retry topic + DLQ.

GroupID пустой или общий

R-KFK-CONS-X3: GroupID: ""kafka-go генерирует случайный GroupID при каждом запуске. Нет shared offset-а, нет ребалансировки между pods. Каждый рестарт сервис читает топик заново с StartOffset.

Общий GroupID для разных consumer-горутин с разными ролями: Kafka видит их как один consumer-group, распределяет партиции произвольно — горутина «для orders» может получить партицию «для payments».

HTTP к внешней системе без CB/bulkhead

R-KFK-CONS-X4: если Handle делает HTTP-вызов и внешняя система не отвечает, горутина зависает на timeout. При стандартном http.Client без явного timeout — потенциально навсегда.

// НЕПРАВИЛЬНО — нет таймаута, нет circuit breaker
func (h *CustomerProfileHandler) Handle(ctx context.Context, evt CustomerCreatedEvent) error {
    profile, err := h.profileClient.Get(evt.CustomerID)
    // если profileClient лежит — горутина висит
    ...
}

Правильно — передавать ctx с дедлайном в HTTP-клиент и оборачивать вызов circuit breaker-ом. Зависший listener не коммитит offset, через MaxWait + heartbeat interval Kafka инициирует rebalance — и те же сообщения получит другая горутина, запустив шторм повторов.

Что запрещено

АнтипаттернПравилоЧто взамен
CommitInterval > 0 (авто-коммит)R-KFK-CONS-X1CommitInterval: 0 + явный CommitMessages
time.Sleep в poll-горутинеR-KFK-CONS-X2retry-топик + отдельная горутина-relay
GroupID: "" или общий на несколько ролейR-KFK-CONS-X3"<service>-<purpose>" на каждый consumer
HTTP к внешней системе без CB/bulkheadR-KFK-CONS-X4CB + ctx с дедлайном
StartOffset: kafka.LastOffset для criticalR-KFK-CONS-4kafka.FirstOffset
N горутин > числа партицийR-KFK-CONS-5N ≤ partitions
Commit до обработки / нет commit при poison pillR-KFK-CONS-2commit всегда последним, включая DLQ-путь
Kafka offset как dedup-ключR-KFK-IDEM-X2event_id из payload + processed_event

Куда дальше

  • Idempotent consumer — processed_event, dedup в одной транзакции.
  • Retry topic + DLQ — non-blocking retry через отдельные горутины.
  • Конфигурация — KafkaConfig через envconfig, проверка топиков на старте.
  • Observability — consumer lag alerts, traceparent через headers.
  • Outbox publishing — почему domain-события не идут прямо из handler.
  • Producer — kafka.Writer, partition key, RequiredAcks.
  • Event design — структура payload, EventID, forward-compat.
  • Security — TLS-dialer, per-service ClientID, PII.