Опирается на правила: R-KFK-RTRY-1R-KFK-RTRY-4 и R-KFK-RTRY-X1R-KFK-RTRY-X4 из Kafka Style Guide → раздел 5. Retry topic + DLQ.

Важно знать

  • Blocking retry (setTimeout-цикл / await sleep внутри eachMessage) — антипаттерн: держит partition, провоцирует rebalance, создаёт дубликаты.
  • Non-blocking retry — публикация в отдельный retry-топик из catch; основной eachMessage возвращает управление немедленно.
  • У kafkajs нет аналога @RetryableTopic — строим схему вручную: отдельный retry-consumer + consumer.pause() до due-time.
  • x-attempt в headers — обязателен: без счётчика попыток нет ограничения max-attempts.
  • Retry только для transient: сеть, 5xx, DB timeout. Контрактные ошибки и баги в коде — сразу в DLQ.
  • DLQ-monitoring — alert на размер; без него DLQ превращается в свалку потерянных событий.
  • Replay из DLQ — ручная операция: ревью события, потом re-publish в основной топик.
  • Проглатывание exception (catch (e) { logger.error(e); } + commitOffsets) — событие потеряно навсегда.

Kafka-consumer в Node должен пережить временные сбои внешних систем без потери сообщений и без блокировки poll-цикла. Retry topic + DLQ — стандартный паттерн UCP: каждая попытка — отдельный топик с возрастающим delay, после исчерпания попыток событие падает в DLQ для ручного разбора.

Топология retry-топиков

R-KFK-RTRY-1: отдельные топики, не блокирующий retry.

orders.confirmed              ← основной
orders.confirmed.retry.1m     ← retry через 1 мин
orders.confirmed.retry.10m    ← retry через 10 мин
orders.confirmed.retry.1h     ← retry через 1 час
orders.confirmed.dlq          ← окончательный fail

Все топики создаются явно (Terraform / kafka-admin CLI). Auto-creation отключена: опечатка в имени создаст новый топик с дефолтными настройками.

Основной consumer + публикация в retry

У kafkajs нет декоратора @RetryableTopic. Логику retry-роутинга выносим в отдельный сервис.

// orders-confirmed.consumer.ts
@Injectable()
export class OrdersConfirmedConsumer implements OnApplicationBootstrap, OnApplicationShutdown {
  private readonly consumer: Consumer;

  constructor(
    private readonly kafka: KafkaClient,
    private readonly billingService: BillingService,
    private readonly retryRouter: RetryRouter,
  ) {
    this.consumer = kafka.instance.consumer({
      groupId: 'billing-order-confirmed',
    });
  }

  async onApplicationBootstrap() {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
    await this.consumer.run({
      autoCommit: false,
      eachMessage: async ({ topic, partition, message, heartbeat }) => {
        const attempt = Number(message.headers?.['x-attempt'] ?? 1);
        const event = orderConfirmedSchema.parse(JSON.parse(message.value!.toString()));

        try {
          await this.billingService.charge(event.orderId, event.totalAmount);
          await this.consumer.commitOffsets([
            { topic, partition, offset: String(Number(message.offset) + 1) },
          ]);
        } catch (err) {
          await this.retryRouter.route({
            topic,
            message,
            attempt,
            err,
            producer: this.kafka.producer,
          });
          await this.consumer.commitOffsets([
            { topic, partition, offset: String(Number(message.offset) + 1) },
          ]);
        }
      },
    });
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}

commitOffsets вызывается и в success-, и в error-ветке: ответственность за повторную доставку переходит к retry-топику, основной топик продвигается вперёд. Poll-цикл не блокируется.

RetryRouter — классификация и маршрутизация

R-KFK-RTRY-2: retry только для transient failures.

// retry-router.service.ts
export type RetryRouteOptions = {
  topic: string;
  message: KafkaMessage;
  attempt: number;
  err: unknown;
  producer: Producer;
};

const RETRY_SCHEDULE = ['1m', '10m', '1h'] as const;
const MAX_ATTEMPTS = RETRY_SCHEDULE.length + 1;

@Injectable()
export class RetryRouter {
  constructor(private readonly logger: Logger) {}

  async route({ topic, message, attempt, err, producer }: RetryRouteOptions) {
    if (!isTransient(err)) {
      await this.sendToDlq(topic, message, attempt, err, producer);
      return;
    }

    if (attempt >= MAX_ATTEMPTS) {
      await this.sendToDlq(topic, message, attempt, err, producer);
      return;
    }

    const delay = RETRY_SCHEDULE[attempt - 1];
    const retryTopic = `${topic}.retry.${delay}`;
    const dueAt = Date.now() + parseDuration(delay);

    await producer.send({
      topic: retryTopic,
      acks: -1,
      messages: [
        {
          key: message.key,
          value: message.value,
          headers: {
            ...message.headers,
            'x-attempt': String(attempt + 1),
            'x-due-at': String(dueAt),
            'x-original-topic': topic,
          },
        },
      ],
    });

    this.logger.warn(`Event routed to ${retryTopic}, attempt=${attempt + 1}`);
  }

  private async sendToDlq(topic: string, message: KafkaMessage, attempt: number, err: unknown, producer: Producer) {
    const dlqTopic = `${topic}.dlq`;
    await producer.send({
      topic: dlqTopic,
      acks: -1,
      messages: [
        {
          key: message.key,
          value: message.value,
          headers: {
            ...message.headers,
            'x-attempt': String(attempt),
            'x-error': String(err instanceof Error ? err.message : err),
            'x-original-topic': topic,
          },
        },
      ],
    });
    this.logger.error(`Event sent to DLQ: ${dlqTopic}, attempt=${attempt}`, err);
  }
}

function isTransient(err: unknown): boolean {
  if (err instanceof NetworkError) return true;
  if (err instanceof KafkaJSConnectionError) return true;
  if (err instanceof DownstreamServerError) return true;   // HTTP 5xx
  if (err instanceof DbTimeoutError) return true;
  return false;
}

Таблица классификации ошибок:

Тип ошибкиRetry?Причина
KafkaJSConnectionError, NetworkErrorДасетевая проблема, временная
HTTP 5xx от downstreamДасервер перегружен или недоступен
DbTimeoutErrorДаDB лагает, восстановится
HTTP 4xx от downstreamНетконтракт нарушен, retry не исправит
Ошибка zod-парсингаНетpayload невалиден, retry бессмыслен
TypeError, ReferenceErrorНетбаг в коде, retry не поможет
Нарушение бизнес-инвариантаНетбизнес-логика отказала, не временно

Retry-consumer с consumer.pause()

Retry-топики (*.retry.1m, *.retry.10m, *.retry.1h) нужен отдельный consumer. Он проверяет x-due-at и ждёт нужного времени через consumer.pause() без блокировки poll-цикла.

// orders-confirmed-retry.consumer.ts
@Injectable()
export class OrdersConfirmedRetryConsumer implements OnApplicationBootstrap, OnApplicationShutdown {
  private readonly consumer: Consumer;

  constructor(
    private readonly kafka: KafkaClient,
    private readonly billingService: BillingService,
    private readonly retryRouter: RetryRouter,
  ) {
    this.consumer = kafka.instance.consumer({
      groupId: 'billing-order-confirmed-retry',
    });
  }

  async onApplicationBootstrap() {
    await this.consumer.connect();
    await this.consumer.subscribe({
      topics: [
        'orders.confirmed.retry.1m',
        'orders.confirmed.retry.10m',
        'orders.confirmed.retry.1h',
      ],
      fromBeginning: true,
    });

    await this.consumer.run({
      autoCommit: false,
      eachMessage: async ({ topic, partition, message }) => {
        const dueAt = Number(message.headers?.['x-due-at'] ?? 0);
        const now = Date.now();

        if (dueAt > now) {
          this.consumer.pause([{ topic, partitions: [partition] }]);
          setTimeout(() => {
            this.consumer.resume([{ topic, partitions: [partition] }]);
          }, dueAt - now);
          return;
        }

        const attempt = Number(message.headers?.['x-attempt'] ?? 1);
        const event = orderConfirmedSchema.parse(JSON.parse(message.value!.toString()));

        try {
          await this.billingService.charge(event.orderId, event.totalAmount);
          await this.consumer.commitOffsets([
            { topic, partition, offset: String(Number(message.offset) + 1) },
          ]);
        } catch (err) {
          await this.retryRouter.route({
            topic: message.headers?.['x-original-topic']?.toString() ?? topic,
            message,
            attempt,
            err,
            producer: this.kafka.producer,
          });
          await this.consumer.commitOffsets([
            { topic, partition, offset: String(Number(message.offset) + 1) },
          ]);
        }
      },
    });
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}

consumer.pause() приостанавливает poll конкретной partition, не блокируя event loop и другие partition'ы. setTimeout возобновляет poll ровно тогда, когда due-time наступил.

DLQ monitoring

R-KFK-RTRY-3: alert на размер DLQ — обязателен.

// dlq-monitor.service.ts
@Injectable()
export class DlqMonitorService implements OnApplicationBootstrap {
  private readonly dlqTopics = [
    'orders.confirmed.dlq',
    'payments.initiated.dlq',
    'customers.registered.dlq',
  ];

  constructor(
    private readonly kafka: KafkaClient,
    private readonly metrics: MetricsService,
  ) {}

  async onApplicationBootstrap() {
    const admin = this.kafka.instance.admin();
    await admin.connect();

    setInterval(async () => {
      for (const topic of this.dlqTopics) {
        const offsets = await admin.fetchTopicOffsets(topic);
        const lag = offsets.reduce((sum, p) => sum + Number(p.high) - Number(p.low), 0);
        this.metrics.gauge('kafka_dlq_size', lag, { topic });
      }
    }, 30_000);
  }
}

Prometheus-alert (пример для orders):

- alert: KafkaDlqBacklog
  expr: kafka_dlq_size{topic="orders.confirmed.dlq"} > 1
  for: 5m
  annotations:
    summary: "DLQ {{ $labels.topic }} содержит необработанные события"
    runbook: https://runbooks.internal/kafka-dlq

Threshold зависит от критичности: для money-событий (orders, payments) — > 1 (одно событие = инцидент); для аналитики — > 1000.

Replay из DLQ

R-KFK-RTRY-4: ручная операция, не автоматическая.

// dlq-replay.controller.ts
@Controller('admin/dlq')
export class DlqReplayController {
  constructor(private readonly replayService: DlqReplayService) {}

  @Post(':topic/replay')
  async replay(
    @Param('topic') topic: string,
    @Body() dto: ReplayRequestDto,
  ): Promise<ReplayResultDto> {
    return this.replayService.replay(topic, dto);
  }
}

// dlq-replay.service.ts
@Injectable()
export class DlqReplayService {
  constructor(private readonly kafka: KafkaClient) {}

  async replay(dlqTopic: string, dto: ReplayRequestDto): Promise<ReplayResultDto> {
    const originalTopic = dlqTopic.replace(/\.dlq$/, '');
    const consumer = this.kafka.instance.consumer({ groupId: `dlq-replay-${Date.now()}` });
    const producer = this.kafka.producer;

    await consumer.connect();
    await consumer.subscribe({ topic: dlqTopic, fromBeginning: true });

    const replayed: string[] = [];

    await consumer.run({
      autoCommit: false,
      eachMessage: async ({ topic, partition, message }) => {
        const eventId = message.headers?.['x-event-id']?.toString();
        if (dto.eventIds && !dto.eventIds.includes(eventId!)) return;

        await producer.send({
          topic: originalTopic,
          acks: -1,
          messages: [
            {
              key: message.key,
              value: message.value,
              headers: { ...message.headers, 'x-attempt': '1', 'x-replayed-from-dlq': 'true' },
            },
          ],
        });

        replayed.push(eventId ?? message.offset.toString());
        await consumer.commitOffsets([
          { topic, partition, offset: String(Number(message.offset) + 1) },
        ]);
      },
    });

    await consumer.disconnect();
    return { replayed };
  }
}

Почему replay не автоматический:

  • В DLQ может оказаться событие с багом в обработчике. Автоматический возврат в основной топик → снова баг → снова DLQ. Бесконечный цикл.
  • Corrupted payload — replay бессмыслен, нужен DELETE.
  • Плановое восстановление после инцидента (payment-provider вернулся через 6 часов) — операционное решение, а не автоматика.

Что запрещено

АнтипаттернПравилоЧто взамен
await sleep(N) / setTimeout-цикл в eachMessageR-KFK-RTRY-X1публикация в retry-топик + отдельный retry-consumer
catch (e) { logger.error(e); } + commitOffsetsR-KFK-RTRY-X2throw / публикация в retry-топик или DLQ явно
Retry-топик без проверки x-attemptR-KFK-RTRY-X3проверка x-attempt >= MAX_ATTEMPTS → DLQ
DLQ без мониторингаR-KFK-RTRY-X4kafka_dlq_size метрика + Prometheus alert
Retry на 4xx / zod-ошибкуR-KFK-RTRY-2isTransient(err) === false → сразу DLQ
Авто-replay из DLQR-KFK-RTRY-4ручная операция через admin/dlq/:topic/replay
autoCreateTopics в продеR-KFK-RTRY-1явное создание топиков
Один retry-топик на весь сервисR-KFK-RTRY-1per-consumer retry-топики

Куда дальше

  • Consumer — почему blocking retry ломает poll-цикл и как работает consumer.pause().
  • Idempotent consumer — replay из DLQ создаёт дубль; dedup через processed_event.
  • Observability — consumer lag, DLQ size alerts через prom-client.
  • Outbox publishing — почему domain-события через outbox, не producer.send из handler.
  • Producer — idempotent: true, acks: -1, publish в retry-топик через тот же producer.
  • Конфигурация — KafkaConfig с zod, fail-fast на отсутствующий топик.
  • Event design — x-attempt и x-due-at как часть envelope, не payload.
  • Security — ACL на retry и DLQ топики.