Опирается на правила:
R-KFK-CONS-1…R-KFK-CONS-6иR-KFK-CONS-X1…R-KFK-CONS-X4из Kafka Style Guide → раздел 2. Consumer.
Важно знать
GroupIDв формате<service>-<purpose>— одинGroupID= одна логическая роль, не общий на весь сервис.CommitInterval: 0— обязателен; авто-коммит = потеря событий при crash.reader.CommitMessages(ctx, msg)— только после успешной обработки; poison pill сначала в DLQ, потом commit.StartOffset: kafka.FirstOffset— для critical-consumer'ов;kafka.LastOffsetпропускает события при новом group.- Listener — горутина с
for { FetchMessage; handle; CommitMessages }— идиома Go-стека;context.Canceled= штатный выход.- Concurrency через N горутин на одном
GroupID, количество ≤ числу партиций.- Никакого
time.Sleepв poll-горутине — блокирует цикл, Kafka считает consumer мёртвым.- HTTP к внешней системе из горутины-listener без CB/bulkhead — listener зависает, rebalance, дубликаты.
Consumer — точка, где сервис принимает факт из внешнего мира. В Go-стеке это горутина с явным poll-циклом: kafka.Reader не управляет обработкой автоматически, вся логика ручная. Главные опасности — авто-коммит смещения до завершения обработки и блокировка poll-цикла.
GroupID: одна роль — один группа
R-KFK-CONS-1: формат <service>-<purpose>.
// infra/kafka/reader.go
func NewOrderConfirmedReader(cfg KafkaConfig) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topics.OrdersConfirmed,
GroupID: "billing-order-confirmed", // <service>-<purpose>
MinBytes: 1,
MaxBytes: 10 << 20,
CommitInterval: 0, // manual commit
StartOffset: kafka.FirstOffset, // earliest для critical
})
}
func NewProductCreatedReader(cfg KafkaConfig) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topics.ProductsCreated,
GroupID: "inventory-product-created", // отдельный group — отдельный offset
CommitInterval: 0,
StartOffset: kafka.FirstOffset,
})
}
Два listener-а в одном сервисе — два разных GroupID. Если использовать общий GroupID: "order-service", Kafka распределяет партиции между consumer-instance-ами всей группы: ребалансировка одного listener-а задевает другой, независимые offset-ы становятся общими.
Manual commit: CommitInterval = 0
R-KFK-CONS-2: CommitInterval: 0 отключает авто-коммит. Commit — явным reader.CommitMessages(ctx, msg) после успешной обработки.
// adapters/in/kafka/order_confirmed_consumer.go
type OrderConfirmedConsumer struct {
reader *kafka.Reader
handler *OrderConfirmedHandler
dlq *kafka.Writer
}
func (c *OrderConfirmedConsumer) Run(ctx context.Context) error {
for {
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return fmt.Errorf("fetch OrderConfirmed: %w", err)
}
var evt OrderConfirmedEvent
if err := json.Unmarshal(msg.Value, &evt); err != nil {
c.sendToDLQ(ctx, msg, err)
if err := c.reader.CommitMessages(ctx, msg); err != nil {
return fmt.Errorf("commit after DLQ (poison): %w", err)
}
continue
}
if err := c.handler.Handle(ctx, evt); err != nil {
if isTransient(err) {
c.sendToRetry(ctx, msg)
} else {
c.sendToDLQ(ctx, msg, err)
}
}
if err := c.reader.CommitMessages(ctx, msg); err != nil {
return fmt.Errorf("commit OrderConfirmed %s: %w", evt.OrderID, err)
}
}
}
Последовательность: FetchMessage → Unmarshal → Handle → CommitMessages. Коммит всегда последний — даже для poison pill, когда сообщение идёт в DLQ: мы признаём факт его получения, но не пытаемся обрабатывать снова.
Если Handle вернул transient-ошибку — отправляем в retry-топик, после чего всё равно коммитим: дальнейшая обработка произойдёт через retry-горутину с задержкой вне poll-цикла.
Идемпотентность: listener обязан переносить дубли
R-KFK-CONS-3: at-least-once delivery — норма Kafka. Дублирование происходит при rebalance до CommitMessages, при DLQ replay, при перезапуске сервиса.
// adapters/in/kafka/order_confirmed_handler.go
func (h *OrderConfirmedHandler) Handle(ctx context.Context, evt OrderConfirmedEvent) error {
tx, err := h.db.Begin(ctx)
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer tx.Rollback(ctx)
inserted, err := h.dedup.TryInsert(ctx, tx, evt.EventID)
if err != nil {
return fmt.Errorf("dedup check: %w", err)
}
if !inserted {
return nil
}
if err := h.billing.ApplyOrderConfirmed(ctx, tx, evt); err != nil {
return err
}
return tx.Commit(ctx)
}
TryInsert вставляет event_id в таблицу processed_event с PRIMARY KEY — duplicate key = уже обработано, выходим без ошибки. Бизнес-результат и запись в processed_event в одной транзакции (R-KFK-IDEM-3). Подробнее — Idempotent consumer.
StartOffset: kafka.FirstOffset для critical-топиков
R-KFK-CONS-4: StartOffset: kafka.FirstOffset эквивалентен auto.offset.reset=earliest.
func NewCustomerCreatedReader(cfg KafkaConfig) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topics.CustomersCreated,
GroupID: "crm-customer-created",
CommitInterval: 0,
StartOffset: kafka.FirstOffset, // начинаем с самого старого
})
}
kafka.FirstOffset срабатывает только когда у GroupID нет сохранённого offset-а — при первом запуске или после сброса. Для action-топиков (orders, payments, customers) — обязательно: kafka.LastOffset при новом consumer-group пропустит все события, существовавшие до первого poll.
Для аналитических топиков, где старые данные не нужны, kafka.LastOffset приемлем — но это осознанное исключение.
Concurrency через горутины
R-KFK-CONS-5: параллелизм в Go-стеке — N горутин с отдельными kafka.Reader на одном GroupID; количество ≤ числу партиций.
// main.go или wire.go
func startProductStockConsumers(ctx context.Context, cfg KafkaConfig, h *ProductStockHandler, partitions int) {
for i := 0; i < partitions; i++ {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topics.ProductStockUpdated,
GroupID: "warehouse-product-stock",
CommitInterval: 0,
StartOffset: kafka.FirstOffset,
})
consumer := &ProductStockConsumer{reader: reader, handler: h}
go func() {
if err := consumer.Run(ctx); err != nil {
slog.ErrorContext(ctx, "product stock consumer stopped", "error", err)
}
}()
}
}
Kafka назначает каждую партицию ровно одному consumer в группе. Горутин больше, чем партиций — лишние не получат партицию и будут ждать в FetchMessage без работы.
MaxWait и контекст вместо таймаута
R-KFK-CONS-6: MaxWait — максимальное время ожидания данных за одним FetchMessage. Не путать с таймаутом обработки — это не одно и то же.
func NewSberPaymentReader(cfg KafkaConfig) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topics.SberPaymentEvents,
GroupID: "payment-sber-events",
CommitInterval: 0,
StartOffset: kafka.FirstOffset,
MaxWait: 500 * time.Millisecond,
})
}
Если обработка одного события занимает несколько секунд (например, DB-запрос + downstream call) — нужно следить, чтобы ctx с дедлайном не истекал до CommitMessages. Не добавляйте context.WithTimeout с коротким дедлайном в poll-горутине: истёкший контекст прервёт CommitMessages, и offset не будет зафиксирован.
Запреты
CommitInterval > 0 (авто-коммит)
R-KFK-CONS-X1: CommitInterval > 0 означает, что kafka-go коммитит offset по таймеру — независимо от того, завершилась ли обработка.
Сценарий: consumer получил 100 сообщений, обрабатывает медленно. Через 1 секунду (при CommitInterval: time.Second) offset за все 100 закоммичен — но обработано только 15. Crash → restart → consumer читает с 101. 85 событий OrderConfirmed потеряны, биллинг не выставлен.
time.Sleep в poll-горутине
R-KFK-CONS-X2: time.Sleep в теле Run блокирует poll-цикл. Kafka не получает heartbeat → считает consumer dead → rebalance → partition уходит другому consumer → тот же вызов CommitMessages при возврате упадёт с ошибкой → дубликаты.
// НЕПРАВИЛЬНО
func (c *OrderConsumer) Run(ctx context.Context) error {
for {
msg, _ := c.reader.FetchMessage(ctx)
for i := 0; i < 3; i++ {
if err := c.handler.Handle(ctx, msg); err == nil {
break
}
time.Sleep(30 * time.Second) // блокирует poll-цикл
}
c.reader.CommitMessages(ctx, msg)
}
}
Retry с задержкой — через отдельные retry-топики и отдельные горутины-relay с time.After вне poll-цикла. Подробнее — Retry topic + DLQ.
GroupID пустой или общий
R-KFK-CONS-X3: GroupID: "" — kafka-go генерирует случайный GroupID при каждом запуске. Нет shared offset-а, нет ребалансировки между pods. Каждый рестарт сервис читает топик заново с StartOffset.
Общий GroupID для разных consumer-горутин с разными ролями: Kafka видит их как один consumer-group, распределяет партиции произвольно — горутина «для orders» может получить партицию «для payments».
HTTP к внешней системе без CB/bulkhead
R-KFK-CONS-X4: если Handle делает HTTP-вызов и внешняя система не отвечает, горутина зависает на timeout. При стандартном http.Client без явного timeout — потенциально навсегда.
// НЕПРАВИЛЬНО — нет таймаута, нет circuit breaker
func (h *CustomerProfileHandler) Handle(ctx context.Context, evt CustomerCreatedEvent) error {
profile, err := h.profileClient.Get(evt.CustomerID)
// если profileClient лежит — горутина висит
...
}
Правильно — передавать ctx с дедлайном в HTTP-клиент и оборачивать вызов circuit breaker-ом. Зависший listener не коммитит offset, через MaxWait + heartbeat interval Kafka инициирует rebalance — и те же сообщения получит другая горутина, запустив шторм повторов.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
CommitInterval > 0 (авто-коммит) | R-KFK-CONS-X1 | CommitInterval: 0 + явный CommitMessages |
time.Sleep в poll-горутине | R-KFK-CONS-X2 | retry-топик + отдельная горутина-relay |
GroupID: "" или общий на несколько ролей | R-KFK-CONS-X3 | "<service>-<purpose>" на каждый consumer |
| HTTP к внешней системе без CB/bulkhead | R-KFK-CONS-X4 | CB + ctx с дедлайном |
StartOffset: kafka.LastOffset для critical | R-KFK-CONS-4 | kafka.FirstOffset |
| N горутин > числа партиций | R-KFK-CONS-5 | N ≤ partitions |
| Commit до обработки / нет commit при poison pill | R-KFK-CONS-2 | commit всегда последним, включая DLQ-путь |
| Kafka offset как dedup-ключ | R-KFK-IDEM-X2 | event_id из payload + processed_event |
Куда дальше
- Idempotent consumer —
processed_event, dedup в одной транзакции. - Retry topic + DLQ — non-blocking retry через отдельные горутины.
- Конфигурация —
KafkaConfigчерезenvconfig, проверка топиков на старте. - Observability — consumer lag alerts,
traceparentчерез headers. - Outbox publishing — почему domain-события не идут прямо из handler.
- Producer —
kafka.Writer, partition key,RequiredAcks. - Event design — структура payload,
EventID, forward-compat. - Security — TLS-dialer, per-service
ClientID, PII.