---
title: "Retry topic + DLQ — non-blocking retry в NestJS/kafkajs"
nav_title: "Retry topic + DLQ"
excerpt: "Non-blocking retry для kafkajs: retry-топики с x-attempt, consumer.pause() до due-time, DLQ с alert. Blocking-retry в eachMessage держит partition."
keywords: "Kafka retry topic NestJS, kafkajs DLQ, non-blocking retry Node, x-attempt header, consumer.pause kafkajs, DLQ monitoring, R-KFK-RTRY, blocking retry запрещён"
focus_keyword: "Kafka retry topic DLQ NestJS"
tags:
  - kafka
  - nestjs
  - typescript
  - retry
  - dlq
  - node
---

# Retry topic + DLQ — non-blocking retry в NestJS/kafkajs

> **Опирается на правила:** `R-KFK-RTRY-1` … `R-KFK-RTRY-4` и `R-KFK-RTRY-X1` … `R-KFK-RTRY-X4` из Kafka Style Guide → [раздел 5. Retry topic + DLQ](/standards/backend/kafka/#5-retry-topic--dlq).

> **Важно знать**
> - **Blocking retry** (`setTimeout`-цикл / `await sleep` внутри `eachMessage`) — антипаттерн: держит partition, провоцирует rebalance, создаёт дубликаты.
> - **Non-blocking retry** — публикация в отдельный retry-топик из `catch`; основной `eachMessage` возвращает управление немедленно.
> - У kafkajs нет аналога `@RetryableTopic` — строим схему вручную: отдельный retry-consumer + `consumer.pause()` до due-time.
> - **`x-attempt`** в headers — обязателен: без счётчика попыток нет ограничения max-attempts.
> - **Retry только для transient**: сеть, 5xx, DB timeout. Контрактные ошибки и баги в коде — сразу в DLQ.
> - **DLQ-monitoring** — alert на размер; без него DLQ превращается в свалку потерянных событий.
> - **Replay из DLQ** — ручная операция: ревью события, потом re-publish в основной топик.
> - **Проглатывание exception** (`catch (e) { logger.error(e); } + commitOffsets`) — событие потеряно навсегда.

Kafka-consumer в Node должен пережить временные сбои внешних систем без потери сообщений и без блокировки poll-цикла. Retry topic + DLQ — стандартный паттерн UCP: каждая попытка — отдельный топик с возрастающим delay, после исчерпания попыток событие падает в DLQ для ручного разбора.

## Топология retry-топиков

`R-KFK-RTRY-1`: отдельные топики, не блокирующий retry.

```
orders.confirmed              ← основной
orders.confirmed.retry.1m     ← retry через 1 мин
orders.confirmed.retry.10m    ← retry через 10 мин
orders.confirmed.retry.1h     ← retry через 1 час
orders.confirmed.dlq          ← окончательный fail
```

Все топики создаются явно (Terraform / kafka-admin CLI). Auto-creation отключена: опечатка в имени создаст новый топик с дефолтными настройками.

## Основной consumer + публикация в retry

У kafkajs нет декоратора `@RetryableTopic`. Логику retry-роутинга выносим в отдельный сервис.

```ts
// orders-confirmed.consumer.ts
@Injectable()
export class OrdersConfirmedConsumer implements OnApplicationBootstrap, OnApplicationShutdown {
  private readonly consumer: Consumer;

  constructor(
    private readonly kafka: Kafka,
    private readonly billingService: BillingService,
    private readonly retryRouter: RetryRouter,
    @Inject(KAFKA_PRODUCER) private readonly producer: Producer,
  ) {
    this.consumer = kafka.consumer({
      groupId: 'billing-order-confirmed',
    });
  }

  async onApplicationBootstrap() {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
    await this.consumer.run({
      autoCommit: false,
      eachMessage: async ({ topic, partition, message, heartbeat }) => {
        const attempt = Number(message.headers?.['x-attempt'] ?? 1);
        const event = orderConfirmedSchema.parse(JSON.parse(message.value!.toString()));

        try {
          await this.billingService.charge(event.orderId, event.totalAmount);
          await this.consumer.commitOffsets([
            { topic, partition, offset: String(Number(message.offset) + 1) },
          ]);
        } catch (err) {
          await this.retryRouter.route({
            topic,
            message,
            attempt,
            err,
            producer: this.producer,
          });
          await this.consumer.commitOffsets([
            { topic, partition, offset: String(Number(message.offset) + 1) },
          ]);
        }
      },
    });
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}
```

`commitOffsets` вызывается и в success-, и в error-ветке: ответственность за повторную доставку переходит к retry-топику, основной топик продвигается вперёд. Poll-цикл не блокируется.

## RetryRouter — классификация и маршрутизация

`R-KFK-RTRY-2`: retry только для transient failures.

```ts
// retry-router.service.ts
import { KafkaJSConnectionError, KafkaJSRequestTimeout } from 'kafkajs';

export type RetryRouteOptions = {
  topic: string;
  message: KafkaMessage;
  attempt: number;
  err: unknown;
  producer: Producer;
};

const RETRY_SCHEDULE = ['1m', '10m', '1h'] as const;
const MAX_ATTEMPTS = RETRY_SCHEDULE.length + 1;

@Injectable()
export class RetryRouter {
  constructor(private readonly logger: Logger) {}

  async route({ topic, message, attempt, err, producer }: RetryRouteOptions) {
    if (!isTransient(err)) {
      await this.sendToDlq(topic, message, attempt, err, producer);
      return;
    }

    if (attempt >= MAX_ATTEMPTS) {
      await this.sendToDlq(topic, message, attempt, err, producer);
      return;
    }

    const delay = RETRY_SCHEDULE[attempt - 1];
    const retryTopic = `${topic}.retry.${delay}`;
    const dueAt = Date.now() + parseDuration(delay);

    await producer.send({
      topic: retryTopic,
      acks: -1,
      messages: [
        {
          key: message.key,
          value: message.value,
          headers: {
            ...message.headers,
            'x-attempt': String(attempt + 1),
            'x-due-at': String(dueAt),
            'x-original-topic': topic,
          },
        },
      ],
    });

    this.logger.warn(`Event routed to ${retryTopic}, attempt=${attempt + 1}`);
  }

  private async sendToDlq(topic: string, message: KafkaMessage, attempt: number, err: unknown, producer: Producer) {
    const dlqTopic = `${topic}.dlq`;
    await producer.send({
      topic: dlqTopic,
      acks: -1,
      messages: [
        {
          key: message.key,
          value: message.value,
          headers: {
            ...message.headers,
            'x-attempt': String(attempt),
            'x-error': String(err instanceof Error ? err.message : err),
            'x-original-topic': topic,
          },
        },
      ],
    });
    this.logger.error(`Event sent to DLQ: ${dlqTopic}, attempt=${attempt}`, err);
  }
}

function isTransient(err: unknown): boolean {
  if (err instanceof KafkaJSConnectionError) return true;
  if (err instanceof KafkaJSRequestTimeout) return true;
  if (err instanceof Error && 'response' in err && (err as any).response?.status >= 500) return true;  // HTTP 5xx
  if (err instanceof Error && /timeout|deadlock/i.test(err.message)) return true;  // DB timeout / deadlock
  return false;
}
```

Таблица классификации ошибок:

| Тип ошибки | Retry? | Причина |
|---|---|---|
| `KafkaJSConnectionError`, `KafkaJSRequestTimeout` | Да | сетевая проблема, временная |
| HTTP 5xx от downstream (`response.status >= 500`) | Да | сервер перегружен или недоступен |
| DB timeout / deadlock (проверка по `err.message`) | Да | DB лагает, восстановится |
| HTTP 4xx от downstream | **Нет** | контракт нарушен, retry не исправит |
| Ошибка zod-парсинга | **Нет** | payload невалиден, retry бессмыслен |
| `TypeError`, `ReferenceError` | **Нет** | баг в коде, retry не поможет |
| Нарушение бизнес-инварианта | **Нет** | бизнес-логика отказала, не временно |

## Retry-consumer с `consumer.pause()`

Retry-топики (`*.retry.1m`, `*.retry.10m`, `*.retry.1h`) нужен отдельный consumer. Он проверяет `x-due-at` и ждёт нужного времени через `consumer.pause()` без блокировки poll-цикла.

```ts
// orders-confirmed-retry.consumer.ts
@Injectable()
export class OrdersConfirmedRetryConsumer implements OnApplicationBootstrap, OnApplicationShutdown {
  private readonly consumer: Consumer;

  constructor(
    private readonly kafka: Kafka,
    private readonly billingService: BillingService,
    private readonly retryRouter: RetryRouter,
    @Inject(KAFKA_PRODUCER) private readonly producer: Producer,
  ) {
    this.consumer = kafka.consumer({
      groupId: 'billing-order-confirmed-retry',
    });
  }

  async onApplicationBootstrap() {
    await this.consumer.connect();
    await this.consumer.subscribe({
      topics: [
        'orders.confirmed.retry.1m',
        'orders.confirmed.retry.10m',
        'orders.confirmed.retry.1h',
      ],
      fromBeginning: true,
    });

    await this.consumer.run({
      autoCommit: false,
      eachMessage: async ({ topic, partition, message }) => {
        const dueAt = Number(message.headers?.['x-due-at'] ?? 0);
        const now = Date.now();

        if (dueAt > now) {
          this.consumer.pause([{ topic, partitions: [partition] }]);
          setTimeout(() => {
            this.consumer.resume([{ topic, partitions: [partition] }]);
          }, dueAt - now);
          return;
        }

        const attempt = Number(message.headers?.['x-attempt'] ?? 1);
        const event = orderConfirmedSchema.parse(JSON.parse(message.value!.toString()));

        try {
          await this.billingService.charge(event.orderId, event.totalAmount);
          await this.consumer.commitOffsets([
            { topic, partition, offset: String(Number(message.offset) + 1) },
          ]);
        } catch (err) {
          await this.retryRouter.route({
            topic: message.headers?.['x-original-topic']?.toString() ?? topic,
            message,
            attempt,
            err,
            producer: this.producer,
          });
          await this.consumer.commitOffsets([
            { topic, partition, offset: String(Number(message.offset) + 1) },
          ]);
        }
      },
    });
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}
```

`consumer.pause()` приостанавливает poll конкретной partition, не блокируя event loop и другие partition'ы. `setTimeout` возобновляет poll ровно тогда, когда due-time наступил.

## DLQ monitoring

`R-KFK-RTRY-3`: alert на размер DLQ — обязателен.

```ts
// dlq-monitor.service.ts
@Injectable()
export class DlqMonitorService implements OnApplicationBootstrap {
  private readonly dlqTopics = [
    'orders.confirmed.dlq',
    'payments.initiated.dlq',
    'customers.registered.dlq',
  ];

  constructor(
    private readonly kafka: Kafka,
    private readonly metrics: MetricsService,
  ) {}

  async onApplicationBootstrap() {
    const admin = this.kafka.admin();
    await admin.connect();

    setInterval(async () => {
      for (const topic of this.dlqTopics) {
        const offsets = await admin.fetchTopicOffsets(topic);
        const lag = offsets.reduce((sum, p) => sum + Number(p.high) - Number(p.low), 0);
        this.metrics.gauge('kafka_dlq_size', lag, { topic });
      }
    }, 30_000);
  }
}
```

Prometheus-alert (пример для orders):

```yaml
- alert: KafkaDlqBacklog
  expr: kafka_dlq_size{topic="orders.confirmed.dlq"} > 1
  for: 5m
  annotations:
    summary: "DLQ {{ $labels.topic }} содержит необработанные события"
    runbook: https://runbooks.internal/kafka-dlq
```

Threshold зависит от критичности: для money-событий (`orders`, `payments`) — `> 1` (одно событие = инцидент); для аналитики — `> 1000`.

## Replay из DLQ

`R-KFK-RTRY-4`: ручная операция, не автоматическая.

```ts
// dlq-replay.controller.ts
@Controller('admin/dlq')
export class DlqReplayController {
  constructor(private readonly replayService: DlqReplayService) {}

  @Post(':topic/replay')
  async replay(
    @Param('topic') topic: string,
    @Body() dto: ReplayRequestDto,
  ): Promise<ReplayResultDto> {
    return this.replayService.replay(topic, dto);
  }
}

// dlq-replay.service.ts
@Injectable()
export class DlqReplayService {
  constructor(
    private readonly kafka: Kafka,
    @Inject(KAFKA_PRODUCER) private readonly producer: Producer,
  ) {}

  async replay(dlqTopic: string, dto: ReplayRequestDto): Promise<ReplayResultDto> {
    const originalTopic = dlqTopic.replace(/\.dlq$/, '');
    const consumer = this.kafka.consumer({ groupId: `dlq-replay-${Date.now()}` });
    const producer = this.producer;

    await consumer.connect();
    await consumer.subscribe({ topic: dlqTopic, fromBeginning: true });

    const replayed: string[] = [];

    await consumer.run({
      autoCommit: false,
      eachMessage: async ({ topic, partition, message }) => {
        const eventId = message.headers?.['x-event-id']?.toString();
        if (dto.eventIds && !dto.eventIds.includes(eventId!)) return;

        await producer.send({
          topic: originalTopic,
          acks: -1,
          messages: [
            {
              key: message.key,
              value: message.value,
              headers: { ...message.headers, 'x-attempt': '1', 'x-replayed-from-dlq': 'true' },
            },
          ],
        });

        replayed.push(eventId ?? message.offset.toString());
        await consumer.commitOffsets([
          { topic, partition, offset: String(Number(message.offset) + 1) },
        ]);
      },
    });

    await consumer.disconnect();
    return { replayed };
  }
}
```

Почему replay не автоматический:
- В DLQ может оказаться событие с багом в обработчике. Автоматический возврат в основной топик → снова баг → снова DLQ. Бесконечный цикл.
- Corrupted payload — replay бессмыслен, нужен `DELETE`.
- Плановое восстановление после инцидента (payment-provider вернулся через 6 часов) — операционное решение, а не автоматика.

## Что запрещено

| Антипаттерн | Правило | Что взамен |
|---|---|---|
| `await sleep(N)` / `setTimeout`-цикл в `eachMessage` | `R-KFK-RTRY-X1` | публикация в retry-топик + отдельный retry-consumer |
| `catch (e) { logger.error(e); }` + `commitOffsets` | `R-KFK-RTRY-X2` | throw / публикация в retry-топик или DLQ явно |
| Retry-топик без проверки `x-attempt` | `R-KFK-RTRY-X3` | проверка `x-attempt >= MAX_ATTEMPTS` → DLQ |
| DLQ без мониторинга | `R-KFK-RTRY-X4` | `kafka_dlq_size` метрика + Prometheus alert |
| Retry на 4xx / zod-ошибку | `R-KFK-RTRY-2` | `isTransient(err) === false` → сразу DLQ |
| Авто-replay из DLQ | `R-KFK-RTRY-4` | ручная операция через `admin/dlq/:topic/replay` |
| `autoCreateTopics` в проде | `R-KFK-RTRY-1` | явное создание топиков |
| Один retry-топик на весь сервис | `R-KFK-RTRY-1` | per-consumer retry-топики |

## Куда дальше

- [Consumer](node/consumer.md) — почему blocking retry ломает poll-цикл и как работает `consumer.pause()`.
- [Idempotent consumer](node/idempotent-consumer.md) — replay из DLQ создаёт дубль; dedup через `processed_event`.
- [Observability](node/observability.md) — consumer lag, DLQ size alerts через `prom-client`.
- [Outbox publishing](node/outbox-publishing.md) — почему domain-события через outbox, не `producer.send` из handler.
- [Producer](node/producer.md) — `idempotent: true`, `acks: -1`, publish в retry-топик через тот же producer.
- [Конфигурация](node/configuration.md) — `KafkaConfig` с zod, fail-fast на отсутствующий топик.
- [Event design](node/event-design.md) — `x-attempt` и `x-due-at` как часть envelope, не payload.
- [Security](node/security.md) — ACL на retry и DLQ топики.
