Опирается на правила: R-SHUT-KFK-1R-SHUT-KFK-4 и R-SHUT-KFK-X1 из Graceful Shutdown Style Guide → раздел 3. Kafka shutdown.

Важно знать

  • Consumer управляется через context.Context — отмена контекста на SIGTERM останавливает FetchMessage после текущего сообщения.
  • CommitMessages вызывается явно после каждого успешного handle — у kafka-go нет автоматического batch-commit; offset фиксируется только кодом.
  • Cascade HTTP + retry > 20s в handle недопустим — не укладывается в budget; cascade выносится в outbox.
  • writer.Close() — обязательный flush pending batch на broker перед завершением; без него отправленные но ещё не flushed сообщения теряются.
  • AutoCommit через CommitInterval — антипаттерн: часть offset фиксируется до обработки, потеря сообщений при SIGTERM.
  • Consumer-горутина регистрируется в sync.WaitGroup — shutdown-последовательность дожидается её завершения перед pool.Close().
  • Ошибки context.Canceled и io.EOF — нормальный выход, не slog.Error; алертинг зашумляется на каждом деплое.

В Spring Kafka shutdown-логика встроена в ConcurrentMessageListenerContainer: контейнер сам дожимает текущий batch и коммитит offset. В Go это ответственность кода: нужно явно отменить контекст, явно вызвать CommitMessages и явно вызвать writer.Close(). Это не сложнее — но требует осознанного порядка.

Consumer — отмена контекста и ручной CommitMessages

R-SHUT-KFK-1: consumer дожимает текущее сообщение и коммитит offset перед завершением.

// internal/consumer/order_consumer.go
type OrderConsumer struct {
    reader  *kafka.Reader
    queries *db.Queries
    orders  *order.Service
}

func (c *OrderConsumer) Run(ctx context.Context) error {
    for {
        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) {
                return nil
            }
            return fmt.Errorf("fetch message: %w", err)
        }

        if err := c.handle(ctx, msg); err != nil {
            return fmt.Errorf("handle order event: %w", err)
        }

        if err := c.reader.CommitMessages(ctx, msg); err != nil {
            return fmt.Errorf("commit offset: %w", err)
        }
    }
}

Когда на SIGTERM отменяется ctx, следующий вызов FetchMessage(ctx) возвращает context.Canceled — горутина выходит через return nil после того, как текущий handle + CommitMessages завершён. Offset зафиксирован, replay не возникает.

Регистрация в WaitGroup — в точке запуска, не внутри горутины:

// cmd/order-service/main.go (фрагмент)
consumerCtx, cancelConsumer := context.WithCancel(ctx)

var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    if err := orderConsumer.Run(consumerCtx); err != nil {
        slog.ErrorContext(ctx, "order consumer exited with error", "error", err)
    }
}()

Дедупликация на случай replay

R-SHUT-KFK-1, R-SHUT-IDEM-1: если SIGTERM пришёл после handle, но до CommitMessages, при перезапуске сообщение придёт повторно. Consumer должен быть idempotent:

// internal/consumer/order_consumer.go
func (c *OrderConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var event OrderConfirmedEvent
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return fmt.Errorf("unmarshal order event: %w", err)
    }

    _, err := c.queries.InsertProcessedEvent(ctx, db.InsertProcessedEventParams{
        EventID:    event.ID,
        ConsumerGroup: "billing-confirmations",
    })
    if errors.Is(err, ErrAlreadyProcessed) {
        return nil
    }
    if err != nil {
        return fmt.Errorf("dedup check: %w", err)
    }

    return c.orders.RecordConfirmation(ctx, event.OrderID, event.TotalAmount)
}

InsertProcessedEventINSERT INTO processed_event ... ON CONFLICT (event_id, consumer_group) DO NOTHING; при replay возвращает ErrAlreadyProcessed, обработка пропускается.

Listener не запускает долгий cascade

R-SHUT-KFK-2: handle не вызывает HTTP с retry на 20–30s — не укладывается в shutdown budget.

Опасный вариант:

func (c *ProductConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var event ProductPriceChangedEvent
    _ = json.Unmarshal(msg.Value, &event)

    // ОПАСНО: HTTP + retry → 30s, shutdown зависает
    if err := c.catalogClient.UpdatePrice(ctx, event.ProductID, event.NewPrice); err != nil {
        return fmt.Errorf("update price: %w", err)
    }
    if err := c.searchClient.Reindex(ctx, event.ProductID); err != nil {
        return fmt.Errorf("reindex product: %w", err)
    }
    return nil
}

При SIGTERM ctx отменяется, HTTP-клиенты получают context.Canceled — возможен частичный cascade. Правильно — локальная транзакция + outbox:

func (c *ProductConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var event ProductPriceChangedEvent
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return fmt.Errorf("unmarshal product event: %w", err)
    }

    return sqlcTx(ctx, c.pool, func(q *db.Queries) error {
        if err := q.UpdateProductPrice(ctx, db.UpdateProductPriceParams{
            ProductID: event.ProductID,
            Price:     event.NewPrice,
        }); err != nil {
            return fmt.Errorf("update product price: %w", err)
        }
        return q.InsertOutboxEvent(ctx, db.InsertOutboxEventParams{
            EventType: "ProductPriceChanged",
            Payload:   msg.Value,
        })
    })
}

handle завершается за < 50ms. Outbox-relay (отдельная горутина) отправляет HTTP — на своём бюджете, со своим дожатием текущего batch на shutdown.

Producer — явный writer.Close()

R-SHUT-KFK-4: kafka.Writer накапливает сообщения в batch и отправляет их broker-у асинхронно. Без Close() pending batch теряется.

// internal/publisher/order_publisher.go
type OrderPublisher struct {
    writer *kafka.Writer
}

func NewOrderPublisher(brokers []string) *OrderPublisher {
    return &OrderPublisher{
        writer: &kafka.Writer{
            Addr:         kafka.TCP(brokers...),
            Topic:        "orders.confirmed",
            Balancer:     &kafka.LeastBytes{},
            BatchTimeout: 5 * time.Millisecond,
        },
    }
}

func (p *OrderPublisher) Publish(ctx context.Context, event OrderConfirmedEvent) error {
    payload, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("marshal order confirmed: %w", err)
    }
    if err := p.writer.WriteMessages(ctx, kafka.Message{
        Key:   []byte(event.OrderID),
        Value: payload,
    }); err != nil {
        return fmt.Errorf("write order confirmed: %w", err)
    }
    return nil
}

func (p *OrderPublisher) Close() error {
    return p.writer.Close()
}
// cmd/order-service/main.go — shutdown-последовательность
shutdownFns := []func(){
    func() { appState.SetNotReady() },                // R-SHUT-CFG-3 — readiness → 503
    func() { cancelConsumer() },                       // R-SHUT-KFK-1 — сигнал consumer
    func() { wg.Wait() },                             // ждём CommitMessages
    func() {                                          // R-SHUT-KFK-4 — flush writer
        if err := orderPublisher.Close(); err != nil {
            slog.ErrorContext(ctx, "kafka writer close", "error", err)
        }
    },
    func() { srv.Shutdown(shutCtx) },                 // R-SHUT-HTTP-1 — HTTP drain
    func() { pool.Close() },                          // R-SHUT-DB-1 — последним
}

Порядок важен: cancelConsumer() + wg.Wait() — consumer завершён, offset зафиксирован; затем publisher.Close() — pending batch сброшен на broker. Только после этого закрываем HTTP и пул.

Observability — структурный лог и метрика

R-SHUT-OBS-2, R-SHUT-OBS-3:

// cmd/order-service/main.go
case sig := <-sigC:
    slog.InfoContext(ctx, "получили SIGTERM, начинаем graceful shutdown",
        "signal", sig.String())
    appState.SetNotReady()

start := time.Now()
for _, fn := range shutdownFns {
    fn()
}
dur := time.Since(start).Seconds()
shutdownDuration.Set(dur) // promauto Gauge app_shutdown_duration_seconds
slog.InfoContext(ctx, "graceful shutdown завершён", "duration_s", dur)

context.Canceled и io.EOF от FetchMessage — нормальный выход consumer-горутины, логировать на Info, не Error (R-SHUT-OBS-X1).

Что запрещено

АнтипаттернПравилоЧто взамен
CommitInterval / авто-commit в kafka-go readerR-SHUT-KFK-X1Явный CommitMessages после каждого handle
Долгий cascade (HTTP + retry > 20s) в handleR-SHUT-KFK-2Локальная TX + outbox-event; relay в отдельной горутине
writer.Close() пропущен в shutdownR-SHUT-KFK-4publisher.Close() перед pool.Close() в shutdown-последовательности
pool.Close() до wg.Wait()R-SHUT-DB-X1wg.Wait() — pgx не паникует при взятии соединения из закрытого пула
slog.Error на context.Canceled из FetchMessageR-SHUT-OBS-X1return nil на context.Canceled/io.EOF; alert-канал без шума на деплоях
Consumer-горутина без WaitGroupR-SHUT-KFK-1wg.Add(1) + defer wg.Done() — shutdown дожидается CommitMessages
reader.Close() как единственная остановка без отмены контекстаR-SHUT-KFK-1cancelConsumer() + дожатие wg.Wait(); reader.Close() опционально после

Куда дальше