Опирается на правила:
R-KFK-CFG-1…R-KFK-CFG-4иR-KFK-CFG-X1…R-KFK-CFG-X2из Kafka Rules → раздел 7. Конфигурация.
Важно знать
KafkaConfig— класс с декораторамиclass-validator, собирается черезConfigModule.forRoot+@IsNotEmpty; никакихprocess.env.KAFKA_BROKERSроссыпью.KAFKA_BROKERS— обязательная переменная среды; хардкод строки брокеров недопустим даже вKafkaModule.kafka.producer({ idempotent: true, maxInFlightRequests: 5 })— опции задаются в конструкторе, не в точке вызова.consumer.run({ autoCommit: false })— явно; дефолт kafkajs —autoCommit: true, это ловушка.- Десериализация — статический
Map<string, ZodSchema>поeventType; динамическийrequireпо строке из payload — запрещён.- Fail-fast:
admin.fetchTopicMetadata({ topics })вonApplicationBootstrap; отсутствующий топик → выброс исключения, процесс не принимает трафик.clientId— per-service, обязателен для ACL-идентификации (R-KFK-SEC-2).- SSL + SASL-параметры — через env / Vault, никогда в коде.
В NestJS нет встроенного Kafka-конфига, который собирается из @Module автоматически с валидацией — всё задаётся вручную. Это даёт гибкость, но создаёт риски: строки брокеров в коде, autoCommit: true по умолчанию, молчаливый старт на несуществующем топике, динамический require по имени из payload. Три идиомы закрывают эти риски: типизированный KafkaConfig с валидацией, статический реестр zod-схем и проверка топиков через admin-клиент на старте.
KafkaConfig через class-validator
R-KFK-CFG-1: параметры через типизированный валидируемый класс.
// infra/config/kafka.config.ts
import { IsNotEmpty, IsString, IsNumber, Min, ValidateNested } from 'class-validator';
import { Type } from 'class-transformer';
export class KafkaProducerConfig {
@IsNumber() @Min(1000)
requestTimeoutMs: number;
@IsNumber() @Min(0)
retryBackoffMs: number;
}
export class KafkaConsumerConfig {
@IsString() @IsNotEmpty()
groupId: string;
@IsNumber() @Min(1)
sessionTimeout: number;
@IsNumber() @Min(1)
rebalanceTimeout: number;
}
export class KafkaConfig {
@IsString() @IsNotEmpty()
clientId: string;
@IsString() @IsNotEmpty()
brokers: string;
@ValidateNested() @Type(() => KafkaProducerConfig)
producer: KafkaProducerConfig;
@ValidateNested() @Type(() => KafkaConsumerConfig)
consumer: KafkaConsumerConfig;
}
// app.module.ts
import { ConfigModule, ConfigService } from '@nestjs/config';
import { plainToInstance } from 'class-transformer';
import { validateSync } from 'class-validator';
ConfigModule.forRoot({
validate: (config: Record<string, unknown>) => {
const validated = plainToInstance(KafkaConfig, {
clientId: config['KAFKA_CLIENT_ID'],
brokers: config['KAFKA_BROKERS'],
producer: {
requestTimeoutMs: Number(config['KAFKA_PRODUCER_REQUEST_TIMEOUT_MS'] ?? 30000),
retryBackoffMs: Number(config['KAFKA_PRODUCER_RETRY_BACKOFF_MS'] ?? 500),
},
consumer: {
groupId: config['KAFKA_CONSUMER_GROUP_ID'],
sessionTimeout: Number(config['KAFKA_CONSUMER_SESSION_TIMEOUT'] ?? 30000),
rebalanceTimeout: Number(config['KAFKA_CONSUMER_REBALANCE_TIMEOUT'] ?? 60000),
},
});
const errors = validateSync(validated, { skipMissingProperties: false });
if (errors.length) throw new Error(errors.toString());
return validated;
},
});
Что это даёт:
- При старте NestJS проверяет все
@IsNotEmpty/@IsNumber— еслиKAFKA_BROKERSне задан, процесс падает немедленно. - TypeScript знает тип конфига во всём DI-дереве.
- IDE подсказывает поля вместо строковых ключей
process.env.
Подробнее — Validation → ConfigModule.
Сборка Kafka-клиента из конфига
R-KFK-CFG-2: клиент собирается из конфига один раз в KafkaModule, producer/consumer-опции не разбросаны по сервисам.
// infra/kafka/kafka.module.ts
import { Module, OnApplicationBootstrap, OnApplicationShutdown } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Producer, Admin } from 'kafkajs';
import { KAFKA_PRODUCER, KAFKA_ADMIN } from './kafka.tokens';
import { EXPECTED_TOPICS } from './kafka.topics';
@Module({
providers: [
{
provide: Kafka,
inject: [ConfigService],
useFactory: (cfg: ConfigService<KafkaConfig>) => new Kafka({
clientId: cfg.get('clientId', { infer: true }),
brokers: cfg.get('brokers', { infer: true }).split(','),
ssl: cfg.get('ssl', { infer: true }),
sasl: cfg.get('sasl', { infer: true }),
}),
},
{
provide: KAFKA_PRODUCER,
inject: [Kafka, ConfigService],
useFactory: async (kafka: Kafka, cfg: ConfigService<KafkaConfig>) => {
const producer = kafka.producer({
idempotent: true,
maxInFlightRequests: 5,
transactionTimeout: cfg.get('producer.requestTimeoutMs', { infer: true }),
});
await producer.connect();
return producer;
},
},
{
provide: KAFKA_ADMIN,
inject: [Kafka],
useFactory: async (kafka: Kafka) => {
const admin = kafka.admin();
await admin.connect();
return admin;
},
},
],
exports: [Kafka, KAFKA_PRODUCER, KAFKA_ADMIN],
})
export class KafkaModule implements OnApplicationBootstrap, OnApplicationShutdown {
constructor(
@Inject(KAFKA_PRODUCER) private readonly producer: Producer,
@Inject(KAFKA_ADMIN) private readonly admin: Admin,
) {}
async onApplicationBootstrap(): Promise<void> {
const metadata = await this.admin.fetchTopicMetadata({ topics: EXPECTED_TOPICS });
const missing = EXPECTED_TOPICS.filter(
(t) => !metadata.topics.some((m) => m.name === t),
);
if (missing.length) {
throw new Error(`Kafka topics not found: ${missing.join(', ')}`);
}
}
async onApplicationShutdown(): Promise<void> {
await this.producer.disconnect();
await this.admin.disconnect();
}
}
// infra/kafka/kafka.topics.ts
export const EXPECTED_TOPICS = [
'orders.confirmed',
'orders.confirmed.retry.1m',
'orders.confirmed.retry.10m',
'orders.confirmed.dlq',
'products.updated',
] as const;
Статический реестр zod-схем
R-KFK-CFG-3: allow-list десериализации — статический Map<string, ZodSchema>, не динамический require/import по строке из payload.
// infra/kafka/schema-registry.ts
import { z } from 'zod';
const orderConfirmedSchema = z.object({
eventId: z.string().uuid(),
eventType: z.literal('OrderConfirmed'),
occurredAt: z.string().datetime(),
aggregateType: z.literal('Order'),
aggregateId: z.string().uuid(),
customerId: z.string().uuid(),
totalAmount: z.number().positive(),
currency: z.string().length(3),
});
const productUpdatedSchema = z.object({
eventId: z.string().uuid(),
eventType: z.literal('ProductUpdated'),
occurredAt: z.string().datetime(),
aggregateType: z.literal('Product'),
aggregateId: z.string().uuid(),
sku: z.string().min(1),
priceAmount: z.number().positive(),
});
export const SCHEMA_REGISTRY = new Map([
['OrderConfirmed', orderConfirmedSchema],
['ProductUpdated', productUpdatedSchema],
]);
export type OrderConfirmedEvent = z.infer<typeof orderConfirmedSchema>;
export type ProductUpdatedEvent = z.infer<typeof productUpdatedSchema>;
// использование в consumer
import { SCHEMA_REGISTRY } from '../schema-registry';
eachMessage: async ({ message }) => {
const raw = JSON.parse(message.value!.toString()) as { eventType: string };
const schema = SCHEMA_REGISTRY.get(raw.eventType);
if (!schema) throw new Error(`Unknown eventType: ${raw.eventType}`);
const event = schema.parse(raw);
await handler.handle(event);
};
Почему статический реестр, а не require(raw.eventType):
- Динамический
requireпо строке из payload открывает вектор атаки: attacker кладёт вeventTypeпуть к системному модулю или к гаджет-цепочке изnode_modules. Это Node-аналогtrusted.packages: '*'из Spring. - Статический
Map— explicitly-allowed list: еслиeventTypeне в реестре —schemaравенundefined, и обработчик отказывает с ошибкой. Никакой магии. - zod-схема проверяет структуру и типы на границе consumer, а не в глубине домена.
Fail-fast: fetchTopicMetadata на старте
R-KFK-CFG-4: сервис не принимает трафик, пока не убедится что все ожидаемые топики существуют.
// Уже показано в KafkaModule.onApplicationBootstrap выше.
// Вот изолированная логика для ясности:
async function assertTopicsExist(admin: Admin, topics: readonly string[]): Promise<void> {
const metadata = await admin.fetchTopicMetadata({ topics: [...topics] });
const existing = new Set(metadata.topics.map((t) => t.name));
const missing = topics.filter((t) => !existing.has(t));
if (missing.length) {
throw new Error(`[Kafka] Required topics missing: ${missing.join(', ')}`);
}
}
Сценарий без проверки:
- Деплой нового consumer в
orders.confirmed.retry.10m. - SRE не создал топик
orders.confirmed.retry.10mв продакшене. - NestJS стартует, consumer-group регистрируется, сообщений нет.
- Retry-логика молча не работает — никаких алертов.
- Через несколько дней пользователи сообщают о «пропавших» заказах.
С assertTopicsExist — процесс бросает исключение в onApplicationBootstrap, K8s видит CrashLoopBackOff, on-call получает алерт в течение минуты.
В dev-окружении топики создаются автоматически (allowAutoTopicCreation: true на брокере) — проверка пройдёт. В продакшене автосоздание отключено — проверка поймает несоответствие.
Consumer: autoCommit: false — явно
// infra/kafka/order-confirmed.consumer.ts
import { Injectable, OnApplicationBootstrap, Inject } from '@nestjs/common';
import { Kafka, Consumer } from 'kafkajs';
import { ConfigService } from '@nestjs/config';
@Injectable()
export class OrderConfirmedConsumer implements OnApplicationBootstrap {
private consumer: Consumer;
constructor(
private readonly kafka: Kafka,
private readonly cfg: ConfigService<KafkaConfig>,
private readonly handler: ConfirmOrderPaymentHandler,
) {}
async onApplicationBootstrap(): Promise<void> {
this.consumer = this.kafka.consumer({
groupId: this.cfg.get('consumer.groupId', { infer: true }),
sessionTimeout: this.cfg.get('consumer.sessionTimeout', { infer: true }),
rebalanceTimeout: this.cfg.get('consumer.rebalanceTimeout', { infer: true }),
});
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
await this.consumer.run({
autoCommit: false,
partitionsConsumedConcurrently: 3,
eachMessage: async ({ topic, partition, message, heartbeat }) => {
const raw = JSON.parse(message.value!.toString()) as { eventType: string };
const schema = SCHEMA_REGISTRY.get(raw.eventType);
if (!schema) throw new Error(`Unknown eventType: ${raw.eventType}`);
const event = schema.parse(raw) as OrderConfirmedEvent;
await heartbeat();
await this.handler.handle(event);
await this.consumer.commitOffsets([{
topic, partition, offset: String(Number(message.offset) + 1),
}]);
},
});
}
}
autoCommit: false — явная строка, без неё kafkajs использует autoCommit: true по умолчанию и коммитит offset по таймеру независимо от того, завершилась обработка или нет. Crash между commit и успешной обработкой = потеря сообщения. Подробнее — Consumer.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
dynamic require(eventType) или import(eventType) по строке из payload | R-KFK-CFG-X1 | статический SCHEMA_REGISTRY: Map<string, ZodSchema> |
brokers: ['kafka-prod:9092'] хардкодом в коде | R-KFK-CFG-X2 | process.env.KAFKA_BROKERS через ConfigService + @IsNotEmpty |
autoCommit: true (дефолт kafkajs) в продакшене | R-KFK-CFG-2 | явный autoCommit: false + commitOffsets |
Отсутствие assertTopicsExist на старте | R-KFK-CFG-4 | fetchTopicMetadata в onApplicationBootstrap |
process.env.X россыпью по модулям | R-KFK-CFG-1 | KafkaConfig с class-validator, ConfigService |
Credentials в .env-файле в репозитории | R-KFK-CFG-X2 | env через CI/CD secrets / Vault |
idempotent: false или maxInFlightRequests > 5 | R-KFK-PROD-1 | { idempotent: true, maxInFlightRequests: 5 } |
ssl: false в продакшене | R-KFK-SEC-1 | ssl: true + sasl в KafkaConfig |
Куда дальше
- Producer —
idempotent: true,acks: -1, partition key. - Consumer —
autoCommit: false,groupId,fromBeginning. - Event design — форма payload, zod-схема, версионирование.
- Idempotent consumer —
SCHEMA_REGISTRYв связке сprocessed_event. - Outbox publishing — relay через
@Interval,setLock. - Retry and DLQ — retry-топики,
x-attempt, DLQ-алерты. - Observability —
prom-client, lag-алерты,traceparent. - Security — SSL/SASL, ACL,
clientId.