Опирается на правила:
R-SHUT-KFK-1…R-SHUT-KFK-4иR-SHUT-KFK-X1из Graceful Shutdown Style Guide → раздел 3. Kafka shutdown.
Важно знать
- Consumer управляется через
context.Context— отмена контекста на SIGTERM останавливаетFetchMessageпосле текущего сообщения.CommitMessagesвызывается явно после каждого успешногоhandle— у kafka-go нет автоматического batch-commit; offset фиксируется только кодом.- Cascade HTTP + retry > 20s в
handleнедопустим — не укладывается в budget; cascade выносится в outbox.writer.Close()— обязательный flush pending batch на broker перед завершением; без него отправленные но ещё не flushed сообщения теряются.AutoCommitчерезCommitInterval— антипаттерн: часть offset фиксируется до обработки, потеря сообщений при SIGTERM.- Consumer-горутина регистрируется в
sync.WaitGroup— shutdown-последовательность дожидается её завершения передpool.Close().- Ошибки
context.Canceledиio.EOF— нормальный выход, неslog.Error; алертинг зашумляется на каждом деплое.
В Spring Kafka shutdown-логика встроена в ConcurrentMessageListenerContainer: контейнер сам дожимает текущий batch и коммитит offset. В Go это ответственность кода: нужно явно отменить контекст, явно вызвать CommitMessages и явно вызвать writer.Close(). Это не сложнее — но требует осознанного порядка.
Consumer — отмена контекста и ручной CommitMessages
R-SHUT-KFK-1: consumer дожимает текущее сообщение и коммитит offset перед завершением.
// internal/consumer/order_consumer.go
type OrderConsumer struct {
reader *kafka.Reader
queries *db.Queries
orders *order.Service
}
func (c *OrderConsumer) Run(ctx context.Context) error {
for {
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) {
return nil
}
return fmt.Errorf("fetch message: %w", err)
}
if err := c.handle(ctx, msg); err != nil {
return fmt.Errorf("handle order event: %w", err)
}
if err := c.reader.CommitMessages(ctx, msg); err != nil {
return fmt.Errorf("commit offset: %w", err)
}
}
}
Когда на SIGTERM отменяется ctx, следующий вызов FetchMessage(ctx) возвращает context.Canceled — горутина выходит через return nil после того, как текущий handle + CommitMessages завершён. Offset зафиксирован, replay не возникает.
Регистрация в WaitGroup — в точке запуска, не внутри горутины:
// cmd/order-service/main.go (фрагмент)
consumerCtx, cancelConsumer := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := orderConsumer.Run(consumerCtx); err != nil {
slog.ErrorContext(ctx, "order consumer exited with error", "error", err)
}
}()
Дедупликация на случай replay
R-SHUT-KFK-1, R-SHUT-IDEM-1: если SIGTERM пришёл после handle, но до CommitMessages, при перезапуске сообщение придёт повторно. Consumer должен быть idempotent:
// internal/consumer/order_consumer.go
func (c *OrderConsumer) handle(ctx context.Context, msg kafka.Message) error {
var event OrderConfirmedEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
return fmt.Errorf("unmarshal order event: %w", err)
}
_, err := c.queries.InsertProcessedEvent(ctx, db.InsertProcessedEventParams{
EventID: event.ID,
ConsumerGroup: "billing-confirmations",
})
if errors.Is(err, ErrAlreadyProcessed) {
return nil
}
if err != nil {
return fmt.Errorf("dedup check: %w", err)
}
return c.orders.RecordConfirmation(ctx, event.OrderID, event.TotalAmount)
}
InsertProcessedEvent — INSERT INTO processed_event ... ON CONFLICT (event_id, consumer_group) DO NOTHING; при replay возвращает ErrAlreadyProcessed, обработка пропускается.
Listener не запускает долгий cascade
R-SHUT-KFK-2: handle не вызывает HTTP с retry на 20–30s — не укладывается в shutdown budget.
Опасный вариант:
func (c *ProductConsumer) handle(ctx context.Context, msg kafka.Message) error {
var event ProductPriceChangedEvent
_ = json.Unmarshal(msg.Value, &event)
// ОПАСНО: HTTP + retry → 30s, shutdown зависает
if err := c.catalogClient.UpdatePrice(ctx, event.ProductID, event.NewPrice); err != nil {
return fmt.Errorf("update price: %w", err)
}
if err := c.searchClient.Reindex(ctx, event.ProductID); err != nil {
return fmt.Errorf("reindex product: %w", err)
}
return nil
}
При SIGTERM ctx отменяется, HTTP-клиенты получают context.Canceled — возможен частичный cascade. Правильно — локальная транзакция + outbox:
func (c *ProductConsumer) handle(ctx context.Context, msg kafka.Message) error {
var event ProductPriceChangedEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
return fmt.Errorf("unmarshal product event: %w", err)
}
return sqlcTx(ctx, c.pool, func(q *db.Queries) error {
if err := q.UpdateProductPrice(ctx, db.UpdateProductPriceParams{
ProductID: event.ProductID,
Price: event.NewPrice,
}); err != nil {
return fmt.Errorf("update product price: %w", err)
}
return q.InsertOutboxEvent(ctx, db.InsertOutboxEventParams{
EventType: "ProductPriceChanged",
Payload: msg.Value,
})
})
}
handle завершается за < 50ms. Outbox-relay (отдельная горутина) отправляет HTTP — на своём бюджете, со своим дожатием текущего batch на shutdown.
Producer — явный writer.Close()
R-SHUT-KFK-4: kafka.Writer накапливает сообщения в batch и отправляет их broker-у асинхронно. Без Close() pending batch теряется.
// internal/publisher/order_publisher.go
type OrderPublisher struct {
writer *kafka.Writer
}
func NewOrderPublisher(brokers []string) *OrderPublisher {
return &OrderPublisher{
writer: &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: "orders.confirmed",
Balancer: &kafka.LeastBytes{},
BatchTimeout: 5 * time.Millisecond,
},
}
}
func (p *OrderPublisher) Publish(ctx context.Context, event OrderConfirmedEvent) error {
payload, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal order confirmed: %w", err)
}
if err := p.writer.WriteMessages(ctx, kafka.Message{
Key: []byte(event.OrderID),
Value: payload,
}); err != nil {
return fmt.Errorf("write order confirmed: %w", err)
}
return nil
}
func (p *OrderPublisher) Close() error {
return p.writer.Close()
}
// cmd/order-service/main.go — shutdown-последовательность
shutdownFns := []func(){
func() { appState.SetNotReady() }, // R-SHUT-CFG-3 — readiness → 503
func() { cancelConsumer() }, // R-SHUT-KFK-1 — сигнал consumer
func() { wg.Wait() }, // ждём CommitMessages
func() { // R-SHUT-KFK-4 — flush writer
if err := orderPublisher.Close(); err != nil {
slog.ErrorContext(ctx, "kafka writer close", "error", err)
}
},
func() { srv.Shutdown(shutCtx) }, // R-SHUT-HTTP-1 — HTTP drain
func() { pool.Close() }, // R-SHUT-DB-1 — последним
}
Порядок важен: cancelConsumer() + wg.Wait() — consumer завершён, offset зафиксирован; затем publisher.Close() — pending batch сброшен на broker. Только после этого закрываем HTTP и пул.
Observability — структурный лог и метрика
R-SHUT-OBS-2, R-SHUT-OBS-3:
// cmd/order-service/main.go
case sig := <-sigC:
slog.InfoContext(ctx, "получили SIGTERM, начинаем graceful shutdown",
"signal", sig.String())
appState.SetNotReady()
start := time.Now()
for _, fn := range shutdownFns {
fn()
}
dur := time.Since(start).Seconds()
shutdownDuration.Set(dur) // promauto Gauge app_shutdown_duration_seconds
slog.InfoContext(ctx, "graceful shutdown завершён", "duration_s", dur)
context.Canceled и io.EOF от FetchMessage — нормальный выход consumer-горутины, логировать на Info, не Error (R-SHUT-OBS-X1).
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
CommitInterval / авто-commit в kafka-go reader | R-SHUT-KFK-X1 | Явный CommitMessages после каждого handle |
Долгий cascade (HTTP + retry > 20s) в handle | R-SHUT-KFK-2 | Локальная TX + outbox-event; relay в отдельной горутине |
writer.Close() пропущен в shutdown | R-SHUT-KFK-4 | publisher.Close() перед pool.Close() в shutdown-последовательности |
pool.Close() до wg.Wait() | R-SHUT-DB-X1 | wg.Wait() — pgx не паникует при взятии соединения из закрытого пула |
slog.Error на context.Canceled из FetchMessage | R-SHUT-OBS-X1 | return nil на context.Canceled/io.EOF; alert-канал без шума на деплоях |
Consumer-горутина без WaitGroup | R-SHUT-KFK-1 | wg.Add(1) + defer wg.Done() — shutdown дожидается CommitMessages |
reader.Close() как единственная остановка без отмены контекста | R-SHUT-KFK-1 | cancelConsumer() + дожатие wg.Wait(); reader.Close() опционально после |
Куда дальше
- Рантайм-конфигурация graceful shutdown —
os.Signal,http.Server.Shutdown,health.State, taймауты. - HTTP drain —
srv.Shutdown(ctx), preStop sleep, долгие эндпоинты через 202. - БД и persistence — порядок
pool.Close(), транзакции в фоновых горутинах. - Scheduled / Async / outbox — outbox-relay,
ctx.Done()перед новой итерацией. - Идемпотентность in-flight —
Idempotency-Key,ON CONFLICT DO NOTHINGпоevent_id. - Бюджеты и observability — cumulative 60s,
app_shutdown_duration_seconds. - Kubernetes —
terminationGracePeriodSeconds: 60, preStop, probes.