← назад к разделу

Когда сервис получает сигнал остановки (SIGTERM), есть риск потерять сообщения: потребитель мог получить событие из Kafka, но не успеть его обработать и зафиксировать offset — после перезапуска Kafka отдаст это сообщение снова. Продюсер может потерять несколько сообщений, которые ещё не успели уйти на broker.

В Spring Boot эти детали скрыты в ConcurrentMessageListenerContainer: контейнер сам дожимает текущую пачку и коммитит offset. В Go ту же работу нужно написать руками — это не сложнее, но требует понять правильный порядок действий.

Как работает остановка потребителя

Потребитель в kafka-go читает сообщения в бесконечном цикле через FetchMessage. Чтобы остановить этот цикл чисто, используют context.Context: когда на SIGTERM отменяется контекст, FetchMessage возвращает ошибку context.Canceled — и это нормальный выход, а не сбой.

Важная деталь: в kafka-go нет автоматического коммита offset. Offset нужно фиксировать явно вызовом CommitMessages — после каждого успешного обработанного сообщения.

func (c *OrderConsumer) Run(ctx context.Context) error {
    for {
        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) {
                return nil // нормальный выход при остановке
            }
            return fmt.Errorf("fetch message: %w", err)
        }

        if err := c.handle(ctx, msg); err != nil {
            return fmt.Errorf("handle order event: %w", err)
        }

        if err := c.reader.CommitMessages(ctx, msg); err != nil {
            return fmt.Errorf("commit offset: %w", err)
        }
    }
}

Когда контекст отменяется, FetchMessage завершится только после того, как текущее сообщение уже обработано и offset зафиксирован. Replay не возникает.

Ошибки context.Canceled и io.EOF — это нормальный выход потребителя при остановке, их не нужно логировать как ошибку. Иначе alert-канал будет шуметь при каждом деплое.

Регистрация горутины в WaitGroup

Чтобы основной процесс дождался завершения потребителя перед закрытием пула соединений, горутину регистрируют в sync.WaitGroup:

consumerCtx, cancelConsumer := context.WithCancel(ctx)

var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    if err := orderConsumer.Run(consumerCtx); err != nil {
        slog.ErrorContext(ctx, "order consumer exited with error", "error", err)
    }
}()

wg.Add(1) стоит до запуска горутины — так нет гонки между стартом горутины и вызовом wg.Wait() в shutdown.

Что делать, если сообщение придёт повторно

Если SIGTERM пришёл после успешного handle, но до CommitMessages, то при следующем запуске сервиса Kafka отдаст это сообщение ещё раз. Поэтому обработчик должен быть идемпотентным — повторная обработка одного и того же события не должна приводить к двойному эффекту.

Стандартный приём — таблица уже обработанных событий:

func (c *OrderConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var event OrderConfirmedEvent
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return fmt.Errorf("unmarshal order event: %w", err)
    }

    tx, err := c.pool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    q := c.queries.WithTx(tx)

    inserted, err := q.InsertProcessedEvent(ctx, db.InsertProcessedEventParams{
        EventID:       event.ID,
        ConsumerGroup: "billing-confirmations",
    })
    if err != nil {
        return fmt.Errorf("dedup check: %w", err)
    }
    if !inserted {
        return nil // уже обработали раньше
    }

    if err := c.orders.RecordConfirmation(ctx, event.OrderID, event.TotalAmount); err != nil {
        return fmt.Errorf("record confirmation: %w", err)
    }

    return tx.Commit(ctx)
}

InsertProcessedEvent использует ON CONFLICT (event_id, consumer_group) DO UPDATE ... RETURNING (xmax = 0) AS inserted. При первой вставке возвращает true, при повторе — false. Важно, что проверка дубля и сама бизнес-логика находятся в одной транзакции: если RecordConfirmation упадёт, транзакция откатится и событие будет переобработано при следующем запуске.

Почему нельзя делать долгий HTTP-запрос внутри handle

Распространённая ошибка — вызвать из обработчика внешний HTTP-сервис с несколькими retry:

// Опасный вариант — так делать не нужно
func (c *ProductConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var event ProductPriceChangedEvent
    _ = json.Unmarshal(msg.Value, &event)

    // HTTP + retry → может занять 20-30 секунд
    if err := c.catalogClient.UpdatePrice(ctx, event.ProductID, event.NewPrice); err != nil {
        return fmt.Errorf("update price: %w", err)
    }
    return nil
}

При SIGTERM контекст отменяется и HTTP-клиент получает context.Canceled. Обработка окажется частичной, а shutdown зависнет на время таймаута.

Правильное решение — писать только в базу и класть событие в outbox-таблицу. Отдельная горутина-relay заберёт его и отправит HTTP уже независимо:

func (c *ProductConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var event ProductPriceChangedEvent
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return fmt.Errorf("unmarshal product event: %w", err)
    }

    return sqlcTx(ctx, c.pool, func(q *db.Queries) error {
        if err := q.UpdateProductPrice(ctx, db.UpdateProductPriceParams{
            ProductID: event.ProductID,
            Price:     event.NewPrice,
        }); err != nil {
            return fmt.Errorf("update product price: %w", err)
        }
        return q.InsertOutboxEvent(ctx, db.InsertOutboxEventParams{
            EventType: "ProductPriceChanged",
            Payload:   msg.Value,
        })
    })
}

Так handle завершается за несколько миллисекунд, а relay работает на своём бюджете.

Остановка продюсера: зачем нужен writer.Close()

kafka.Writer накапливает сообщения в пачку и отправляет их на broker асинхронно. Если процесс завершится без явного вызова Close(), сообщения из текущей незавершённой пачки просто потеряются — они не попадут в Kafka.

type OrderPublisher struct {
    writer *kafka.Writer
}

func NewOrderPublisher(brokers []string) *OrderPublisher {
    return &OrderPublisher{
        writer: &kafka.Writer{
            Addr:         kafka.TCP(brokers...),
            Topic:        "orders.confirmed",
            Balancer:     &kafka.LeastBytes{},
            BatchTimeout: 5 * time.Millisecond,
        },
    }
}

func (p *OrderPublisher) Publish(ctx context.Context, event OrderConfirmedEvent) error {
    payload, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("marshal order confirmed: %w", err)
    }
    return p.writer.WriteMessages(ctx, kafka.Message{
        Key:   []byte(event.OrderID),
        Value: payload,
    })
}

func (p *OrderPublisher) Close() error {
    return p.writer.Close()
}

writer.Close() блокируется до тех пор, пока все накопленные сообщения не отправлены на broker. После этого он возвращается — и только тогда можно закрывать остальные ресурсы.

Правильный порядок shutdown

Порядок остановки компонентов имеет значение. Нельзя закрыть пул соединений до того, как потребитель завершил CommitMessages — pgx запаникует при попытке взять соединение из закрытого пула.

shutdownFns := []func(){
    func() { appState.SetNotReady() },   // перестаём принимать трафик
    func() { cancelConsumer() },          // сигнализируем потребителю остановиться
    func() { wg.Wait() },                // ждём, пока ConsumerMessages завершится
    func() {
        if err := orderPublisher.Close(); err != nil {
            slog.ErrorContext(ctx, "kafka writer close", "error", err)
        }
    },                                    // сбрасываем pending batch продюсера
    func() { srv.Shutdown(shutCtx) },    // HTTP: дожидаемся текущих запросов
    func() { pool.Close() },             // БД — самый последний
}

Логика проста: сначала останавливаем то, что принимает новую работу, затем дожидаемся завершения текущей, и только потом закрываем разделяемые ресурсы (HTTP, БД).

Частые ошибки

Автокоммит через CommitInterval. В kafka-go есть опция CommitInterval на kafka.Reader, которая автоматически фиксирует offset с заданным интервалом. Это удобно, но опасно: offset может зафиксироваться до того, как сообщение реально обработано. При SIGTERM часть сообщений будет считаться обработанной, хотя обработчик до них не добрался. Нужен явный CommitMessages после каждого успешного handle.

Запустить горутину без WaitGroup. Если не дождаться завершения потребителя перед pool.Close(), горутина может попытаться взять соединение из уже закрытого пула.

Пропустить writer.Close(). Без явного закрытия writer'а несколько последних сообщений останутся в буфере и не попадут в Kafka.

Логировать context.Canceled как ошибку. Это нормальный способ завершения потребителя — не нужно включать в alert.

Коротко

  • Потребитель останавливается через отмену context.ContextFetchMessage возвращает context.Canceled, и это нормальный выход.
  • В kafka-go нет автокоммита: offset нужно фиксировать явно через CommitMessages после каждого успешного handle.
  • Горутина-потребитель регистрируется в sync.WaitGroup — shutdown ждёт её через wg.Wait() перед закрытием пула.
  • Если SIGTERM пришёл между handle и CommitMessages, сообщение придёт повторно — обработчик должен быть идемпотентным.
  • Долгие HTTP-запросы с retry из handle опасны: при отмене контекста обработка будет частичной. Используй outbox-паттерн.
  • writer.Close() блокируется и сбрасывает накопленную пачку на broker — без него последние сообщения теряются.
  • Порядок shutdown: отмена контекста → wg.Wait()writer.Close() → HTTP drain → pool.Close().

Что почитать дальше

  • HTTP drain в Go — srv.Shutdown(ctx), preStop sleep, долгие эндпоинты.
  • БД и persistence в Go — порядок pool.Close(), транзакции в фоновых горутинах.
  • Scheduled / Async / outbox в Go — outbox-relay, ctx.Done() перед новой итерацией.
  • Бюджеты и observability в Go — cumulative 60s, метрика app_shutdown_duration_seconds.