Опирается на правила:
R-KFK-RTRY-1…R-KFK-RTRY-4иR-KFK-RTRY-X1…R-KFK-RTRY-X4из Kafka Style Guide → раздел 5. Retry topic + DLQ.
Важно знать
- Blocking retry (
setTimeout-цикл /await sleepвнутриeachMessage) — антипаттерн: держит partition, провоцирует rebalance, создаёт дубликаты.- Non-blocking retry — публикация в отдельный retry-топик из
catch; основнойeachMessageвозвращает управление немедленно.- У kafkajs нет аналога
@RetryableTopic— строим схему вручную: отдельный retry-consumer +consumer.pause()до due-time.x-attemptв headers — обязателен: без счётчика попыток нет ограничения max-attempts.- Retry только для transient: сеть, 5xx, DB timeout. Контрактные ошибки и баги в коде — сразу в DLQ.
- DLQ-monitoring — alert на размер; без него DLQ превращается в свалку потерянных событий.
- Replay из DLQ — ручная операция: ревью события, потом re-publish в основной топик.
- Проглатывание exception (
catch (e) { logger.error(e); } + commitOffsets) — событие потеряно навсегда.
Kafka-consumer в Node должен пережить временные сбои внешних систем без потери сообщений и без блокировки poll-цикла. Retry topic + DLQ — стандартный паттерн UCP: каждая попытка — отдельный топик с возрастающим delay, после исчерпания попыток событие падает в DLQ для ручного разбора.
Топология retry-топиков
R-KFK-RTRY-1: отдельные топики, не блокирующий retry.
orders.confirmed ← основной
orders.confirmed.retry.1m ← retry через 1 мин
orders.confirmed.retry.10m ← retry через 10 мин
orders.confirmed.retry.1h ← retry через 1 час
orders.confirmed.dlq ← окончательный fail
Все топики создаются явно (Terraform / kafka-admin CLI). Auto-creation отключена: опечатка в имени создаст новый топик с дефолтными настройками.
Основной consumer + публикация в retry
У kafkajs нет декоратора @RetryableTopic. Логику retry-роутинга выносим в отдельный сервис.
// orders-confirmed.consumer.ts
@Injectable()
export class OrdersConfirmedConsumer implements OnApplicationBootstrap, OnApplicationShutdown {
private readonly consumer: Consumer;
constructor(
private readonly kafka: KafkaClient,
private readonly billingService: BillingService,
private readonly retryRouter: RetryRouter,
) {
this.consumer = kafka.instance.consumer({
groupId: 'billing-order-confirmed',
});
}
async onApplicationBootstrap() {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
await this.consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message, heartbeat }) => {
const attempt = Number(message.headers?.['x-attempt'] ?? 1);
const event = orderConfirmedSchema.parse(JSON.parse(message.value!.toString()));
try {
await this.billingService.charge(event.orderId, event.totalAmount);
await this.consumer.commitOffsets([
{ topic, partition, offset: String(Number(message.offset) + 1) },
]);
} catch (err) {
await this.retryRouter.route({
topic,
message,
attempt,
err,
producer: this.kafka.producer,
});
await this.consumer.commitOffsets([
{ topic, partition, offset: String(Number(message.offset) + 1) },
]);
}
},
});
}
async onApplicationShutdown() {
await this.consumer.disconnect();
}
}
commitOffsets вызывается и в success-, и в error-ветке: ответственность за повторную доставку переходит к retry-топику, основной топик продвигается вперёд. Poll-цикл не блокируется.
RetryRouter — классификация и маршрутизация
R-KFK-RTRY-2: retry только для transient failures.
// retry-router.service.ts
export type RetryRouteOptions = {
topic: string;
message: KafkaMessage;
attempt: number;
err: unknown;
producer: Producer;
};
const RETRY_SCHEDULE = ['1m', '10m', '1h'] as const;
const MAX_ATTEMPTS = RETRY_SCHEDULE.length + 1;
@Injectable()
export class RetryRouter {
constructor(private readonly logger: Logger) {}
async route({ topic, message, attempt, err, producer }: RetryRouteOptions) {
if (!isTransient(err)) {
await this.sendToDlq(topic, message, attempt, err, producer);
return;
}
if (attempt >= MAX_ATTEMPTS) {
await this.sendToDlq(topic, message, attempt, err, producer);
return;
}
const delay = RETRY_SCHEDULE[attempt - 1];
const retryTopic = `${topic}.retry.${delay}`;
const dueAt = Date.now() + parseDuration(delay);
await producer.send({
topic: retryTopic,
acks: -1,
messages: [
{
key: message.key,
value: message.value,
headers: {
...message.headers,
'x-attempt': String(attempt + 1),
'x-due-at': String(dueAt),
'x-original-topic': topic,
},
},
],
});
this.logger.warn(`Event routed to ${retryTopic}, attempt=${attempt + 1}`);
}
private async sendToDlq(topic: string, message: KafkaMessage, attempt: number, err: unknown, producer: Producer) {
const dlqTopic = `${topic}.dlq`;
await producer.send({
topic: dlqTopic,
acks: -1,
messages: [
{
key: message.key,
value: message.value,
headers: {
...message.headers,
'x-attempt': String(attempt),
'x-error': String(err instanceof Error ? err.message : err),
'x-original-topic': topic,
},
},
],
});
this.logger.error(`Event sent to DLQ: ${dlqTopic}, attempt=${attempt}`, err);
}
}
function isTransient(err: unknown): boolean {
if (err instanceof NetworkError) return true;
if (err instanceof KafkaJSConnectionError) return true;
if (err instanceof DownstreamServerError) return true; // HTTP 5xx
if (err instanceof DbTimeoutError) return true;
return false;
}
Таблица классификации ошибок:
| Тип ошибки | Retry? | Причина |
|---|---|---|
KafkaJSConnectionError, NetworkError | Да | сетевая проблема, временная |
| HTTP 5xx от downstream | Да | сервер перегружен или недоступен |
DbTimeoutError | Да | DB лагает, восстановится |
| HTTP 4xx от downstream | Нет | контракт нарушен, retry не исправит |
| Ошибка zod-парсинга | Нет | payload невалиден, retry бессмыслен |
TypeError, ReferenceError | Нет | баг в коде, retry не поможет |
| Нарушение бизнес-инварианта | Нет | бизнес-логика отказала, не временно |
Retry-consumer с consumer.pause()
Retry-топики (*.retry.1m, *.retry.10m, *.retry.1h) нужен отдельный consumer. Он проверяет x-due-at и ждёт нужного времени через consumer.pause() без блокировки poll-цикла.
// orders-confirmed-retry.consumer.ts
@Injectable()
export class OrdersConfirmedRetryConsumer implements OnApplicationBootstrap, OnApplicationShutdown {
private readonly consumer: Consumer;
constructor(
private readonly kafka: KafkaClient,
private readonly billingService: BillingService,
private readonly retryRouter: RetryRouter,
) {
this.consumer = kafka.instance.consumer({
groupId: 'billing-order-confirmed-retry',
});
}
async onApplicationBootstrap() {
await this.consumer.connect();
await this.consumer.subscribe({
topics: [
'orders.confirmed.retry.1m',
'orders.confirmed.retry.10m',
'orders.confirmed.retry.1h',
],
fromBeginning: true,
});
await this.consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
const dueAt = Number(message.headers?.['x-due-at'] ?? 0);
const now = Date.now();
if (dueAt > now) {
this.consumer.pause([{ topic, partitions: [partition] }]);
setTimeout(() => {
this.consumer.resume([{ topic, partitions: [partition] }]);
}, dueAt - now);
return;
}
const attempt = Number(message.headers?.['x-attempt'] ?? 1);
const event = orderConfirmedSchema.parse(JSON.parse(message.value!.toString()));
try {
await this.billingService.charge(event.orderId, event.totalAmount);
await this.consumer.commitOffsets([
{ topic, partition, offset: String(Number(message.offset) + 1) },
]);
} catch (err) {
await this.retryRouter.route({
topic: message.headers?.['x-original-topic']?.toString() ?? topic,
message,
attempt,
err,
producer: this.kafka.producer,
});
await this.consumer.commitOffsets([
{ topic, partition, offset: String(Number(message.offset) + 1) },
]);
}
},
});
}
async onApplicationShutdown() {
await this.consumer.disconnect();
}
}
consumer.pause() приостанавливает poll конкретной partition, не блокируя event loop и другие partition'ы. setTimeout возобновляет poll ровно тогда, когда due-time наступил.
DLQ monitoring
R-KFK-RTRY-3: alert на размер DLQ — обязателен.
// dlq-monitor.service.ts
@Injectable()
export class DlqMonitorService implements OnApplicationBootstrap {
private readonly dlqTopics = [
'orders.confirmed.dlq',
'payments.initiated.dlq',
'customers.registered.dlq',
];
constructor(
private readonly kafka: KafkaClient,
private readonly metrics: MetricsService,
) {}
async onApplicationBootstrap() {
const admin = this.kafka.instance.admin();
await admin.connect();
setInterval(async () => {
for (const topic of this.dlqTopics) {
const offsets = await admin.fetchTopicOffsets(topic);
const lag = offsets.reduce((sum, p) => sum + Number(p.high) - Number(p.low), 0);
this.metrics.gauge('kafka_dlq_size', lag, { topic });
}
}, 30_000);
}
}
Prometheus-alert (пример для orders):
- alert: KafkaDlqBacklog
expr: kafka_dlq_size{topic="orders.confirmed.dlq"} > 1
for: 5m
annotations:
summary: "DLQ {{ $labels.topic }} содержит необработанные события"
runbook: https://runbooks.internal/kafka-dlq
Threshold зависит от критичности: для money-событий (orders, payments) — > 1 (одно событие = инцидент); для аналитики — > 1000.
Replay из DLQ
R-KFK-RTRY-4: ручная операция, не автоматическая.
// dlq-replay.controller.ts
@Controller('admin/dlq')
export class DlqReplayController {
constructor(private readonly replayService: DlqReplayService) {}
@Post(':topic/replay')
async replay(
@Param('topic') topic: string,
@Body() dto: ReplayRequestDto,
): Promise<ReplayResultDto> {
return this.replayService.replay(topic, dto);
}
}
// dlq-replay.service.ts
@Injectable()
export class DlqReplayService {
constructor(private readonly kafka: KafkaClient) {}
async replay(dlqTopic: string, dto: ReplayRequestDto): Promise<ReplayResultDto> {
const originalTopic = dlqTopic.replace(/\.dlq$/, '');
const consumer = this.kafka.instance.consumer({ groupId: `dlq-replay-${Date.now()}` });
const producer = this.kafka.producer;
await consumer.connect();
await consumer.subscribe({ topic: dlqTopic, fromBeginning: true });
const replayed: string[] = [];
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
const eventId = message.headers?.['x-event-id']?.toString();
if (dto.eventIds && !dto.eventIds.includes(eventId!)) return;
await producer.send({
topic: originalTopic,
acks: -1,
messages: [
{
key: message.key,
value: message.value,
headers: { ...message.headers, 'x-attempt': '1', 'x-replayed-from-dlq': 'true' },
},
],
});
replayed.push(eventId ?? message.offset.toString());
await consumer.commitOffsets([
{ topic, partition, offset: String(Number(message.offset) + 1) },
]);
},
});
await consumer.disconnect();
return { replayed };
}
}
Почему replay не автоматический:
- В DLQ может оказаться событие с багом в обработчике. Автоматический возврат в основной топик → снова баг → снова DLQ. Бесконечный цикл.
- Corrupted payload — replay бессмыслен, нужен
DELETE. - Плановое восстановление после инцидента (payment-provider вернулся через 6 часов) — операционное решение, а не автоматика.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
await sleep(N) / setTimeout-цикл в eachMessage | R-KFK-RTRY-X1 | публикация в retry-топик + отдельный retry-consumer |
catch (e) { logger.error(e); } + commitOffsets | R-KFK-RTRY-X2 | throw / публикация в retry-топик или DLQ явно |
Retry-топик без проверки x-attempt | R-KFK-RTRY-X3 | проверка x-attempt >= MAX_ATTEMPTS → DLQ |
| DLQ без мониторинга | R-KFK-RTRY-X4 | kafka_dlq_size метрика + Prometheus alert |
| Retry на 4xx / zod-ошибку | R-KFK-RTRY-2 | isTransient(err) === false → сразу DLQ |
| Авто-replay из DLQ | R-KFK-RTRY-4 | ручная операция через admin/dlq/:topic/replay |
autoCreateTopics в проде | R-KFK-RTRY-1 | явное создание топиков |
| Один retry-топик на весь сервис | R-KFK-RTRY-1 | per-consumer retry-топики |
Куда дальше
- Consumer — почему blocking retry ломает poll-цикл и как работает
consumer.pause(). - Idempotent consumer — replay из DLQ создаёт дубль; dedup через
processed_event. - Observability — consumer lag, DLQ size alerts через
prom-client. - Outbox publishing — почему domain-события через outbox, не
producer.sendиз handler. - Producer —
idempotent: true,acks: -1, publish в retry-топик через тот же producer. - Конфигурация —
KafkaConfigс zod, fail-fast на отсутствующий топик. - Event design —
x-attemptиx-due-atкак часть envelope, не payload. - Security — ACL на retry и DLQ топики.