---
title: "Конфигурация Kafka в Node — KafkaConfig, zod и реестр zod-схем"
nav_title: "Конфигурация"
excerpt: "KafkaConfig через class-validator, статический реестр zod-схем вместо dynamic require, fail-fast через admin.fetchTopicMetadata — Node/NestJS-идиомы для R-KFK-CFG-1..4."
keywords: "kafkajs NestJS configuration, KafkaConfig class-validator, kafkajs zod schema registry, admin fetchTopicMetadata, R-KFK-CFG, NestJS kafka config"
focus_keyword: "kafkajs NestJS KafkaConfig configuration"
tags: ["kafka", "node", "typescript", "nestjs", "configuration", "R-KFK-CFG"]
---

# Конфигурация Kafka в Node — KafkaConfig, zod и реестр zod-схем

> **Опирается на правила:** `R-KFK-CFG-1` … `R-KFK-CFG-4` и `R-KFK-CFG-X1` … `R-KFK-CFG-X2` из Kafka Rules → [раздел 7. Конфигурация](/standards/backend/kafka/#7-конфигурация).

> **Важно знать**
> - `KafkaConfig` — класс с декораторами `class-validator`, собирается через `ConfigModule.forRoot` + `@IsNotEmpty`; никаких `process.env.KAFKA_BROKERS` россыпью.
> - `KAFKA_BROKERS` — обязательная переменная среды; хардкод строки брокеров недопустим даже в `KafkaModule`.
> - `kafka.producer({ idempotent: true, maxInFlightRequests: 5 })` — опции задаются в конструкторе, не в точке вызова.
> - `consumer.run({ autoCommit: false })` — явно; дефолт kafkajs — `autoCommit: true`, это ловушка.
> - Десериализация — статический `Map<string, ZodSchema>` по `eventType`; динамический `require` по строке из payload — запрещён.
> - Fail-fast: `admin.fetchTopicMetadata({ topics })` в `onApplicationBootstrap`; отсутствующий топик → выброс исключения, процесс не принимает трафик.
> - `clientId` — per-service, обязателен для ACL-идентификации (`R-KFK-SEC-2`).
> - SSL + SASL-параметры — через env / Vault, никогда в коде.

В NestJS нет встроенного Kafka-конфига, который собирается из `@Module` автоматически с валидацией — всё задаётся вручную. Это даёт гибкость, но создаёт риски: строки брокеров в коде, `autoCommit: true` по умолчанию, молчаливый старт на несуществующем топике, динамический `require` по имени из payload. Три идиомы закрывают эти риски: типизированный `KafkaConfig` с валидацией, статический реестр zod-схем и проверка топиков через admin-клиент на старте.

## `KafkaConfig` через `class-validator`

`R-KFK-CFG-1`: параметры через типизированный валидируемый класс.

```ts
// infra/config/kafka.config.ts
import { IsNotEmpty, IsString, IsNumber, Min, ValidateNested } from 'class-validator';
import { Type } from 'class-transformer';

export class KafkaProducerConfig {
  @IsNumber() @Min(1000)
  requestTimeoutMs: number;

  @IsNumber() @Min(0)
  retryBackoffMs: number;
}

export class KafkaConsumerConfig {
  @IsString() @IsNotEmpty()
  groupId: string;

  @IsNumber() @Min(1)
  sessionTimeout: number;

  @IsNumber() @Min(1)
  rebalanceTimeout: number;
}

export class KafkaConfig {
  @IsString() @IsNotEmpty()
  clientId: string;

  @IsString() @IsNotEmpty()
  brokers: string;

  @ValidateNested() @Type(() => KafkaProducerConfig)
  producer: KafkaProducerConfig;

  @ValidateNested() @Type(() => KafkaConsumerConfig)
  consumer: KafkaConsumerConfig;
}
```

```ts
// app.module.ts
import { ConfigModule, ConfigService } from '@nestjs/config';
import { plainToInstance } from 'class-transformer';
import { validateSync } from 'class-validator';

ConfigModule.forRoot({
  validate: (config: Record<string, unknown>) => {
    const validated = plainToInstance(KafkaConfig, {
      clientId:  config['KAFKA_CLIENT_ID'],
      brokers:   config['KAFKA_BROKERS'],
      producer: {
        requestTimeoutMs: Number(config['KAFKA_PRODUCER_REQUEST_TIMEOUT_MS'] ?? 30000),
        retryBackoffMs:   Number(config['KAFKA_PRODUCER_RETRY_BACKOFF_MS']   ?? 500),
      },
      consumer: {
        groupId:          config['KAFKA_CONSUMER_GROUP_ID'],
        sessionTimeout:   Number(config['KAFKA_CONSUMER_SESSION_TIMEOUT']    ?? 30000),
        rebalanceTimeout: Number(config['KAFKA_CONSUMER_REBALANCE_TIMEOUT']  ?? 60000),
      },
    });
    const errors = validateSync(validated, { skipMissingProperties: false });
    if (errors.length) throw new Error(errors.toString());
    return validated;
  },
});
```

Что это даёт:
- При старте NestJS проверяет все `@IsNotEmpty`/`@IsNumber` — если `KAFKA_BROKERS` не задан, процесс падает немедленно.
- TypeScript знает тип конфига во всём DI-дереве.
- IDE подсказывает поля вместо строковых ключей `process.env`.

Подробнее — [Validation → ConfigModule](/standards/backend/validation/node/configuration-properties/).

## Сборка `Kafka`-клиента из конфига

`R-KFK-CFG-2`: клиент собирается из конфига один раз в `KafkaModule`, producer/consumer-опции не разбросаны по сервисам.

```ts
// infra/kafka/kafka.module.ts
import { Module, OnApplicationBootstrap, OnApplicationShutdown } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Producer, Admin } from 'kafkajs';
import { KAFKA_PRODUCER, KAFKA_ADMIN } from './kafka.tokens';
import { EXPECTED_TOPICS } from './kafka.topics';

@Module({
  providers: [
    {
      provide: Kafka,
      inject: [ConfigService],
      useFactory: (cfg: ConfigService<KafkaConfig>) => new Kafka({
        clientId: cfg.get('clientId', { infer: true }),
        brokers:  cfg.get('brokers',   { infer: true }).split(','),
        ssl:      cfg.get('ssl',        { infer: true }),
        sasl:     cfg.get('sasl',       { infer: true }),
      }),
    },
    {
      provide: KAFKA_PRODUCER,
      inject: [Kafka, ConfigService],
      useFactory: async (kafka: Kafka, cfg: ConfigService<KafkaConfig>) => {
        const producer = kafka.producer({
          idempotent:           true,
          maxInFlightRequests:  5,
          transactionTimeout:   cfg.get('producer.requestTimeoutMs', { infer: true }),
        });
        await producer.connect();
        return producer;
      },
    },
    {
      provide: KAFKA_ADMIN,
      inject: [Kafka],
      useFactory: async (kafka: Kafka) => {
        const admin = kafka.admin();
        await admin.connect();
        return admin;
      },
    },
  ],
  exports: [Kafka, KAFKA_PRODUCER, KAFKA_ADMIN],
})
export class KafkaModule implements OnApplicationBootstrap, OnApplicationShutdown {
  constructor(
    @Inject(KAFKA_PRODUCER) private readonly producer: Producer,
    @Inject(KAFKA_ADMIN)    private readonly admin: Admin,
  ) {}

  async onApplicationBootstrap(): Promise<void> {
    const metadata = await this.admin.fetchTopicMetadata({ topics: EXPECTED_TOPICS });
    const missing = EXPECTED_TOPICS.filter(
      (t) => !metadata.topics.some((m) => m.name === t),
    );
    if (missing.length) {
      throw new Error(`Kafka topics not found: ${missing.join(', ')}`);
    }
  }

  async onApplicationShutdown(): Promise<void> {
    await this.producer.disconnect();
    await this.admin.disconnect();
  }
}
```

```ts
// infra/kafka/kafka.topics.ts
export const EXPECTED_TOPICS = [
  'orders.confirmed',
  'orders.confirmed.retry.1m',
  'orders.confirmed.retry.10m',
  'orders.confirmed.dlq',
  'products.updated',
] as const;
```

## Статический реестр zod-схем

`R-KFK-CFG-3`: allow-list десериализации — статический `Map<string, ZodSchema>`, не динамический `require`/`import` по строке из payload.

```ts
// infra/kafka/schema-registry.ts
import { z } from 'zod';

const orderConfirmedSchema = z.object({
  eventId:       z.string().uuid(),
  eventType:     z.literal('OrderConfirmed'),
  occurredAt:    z.string().datetime(),
  aggregateType: z.literal('Order'),
  aggregateId:   z.string().uuid(),
  customerId:    z.string().uuid(),
  totalAmount:   z.number().positive(),
  currency:      z.string().length(3),
});

const productUpdatedSchema = z.object({
  eventId:       z.string().uuid(),
  eventType:     z.literal('ProductUpdated'),
  occurredAt:    z.string().datetime(),
  aggregateType: z.literal('Product'),
  aggregateId:   z.string().uuid(),
  sku:           z.string().min(1),
  priceAmount:   z.number().positive(),
});

export const SCHEMA_REGISTRY = new Map([
  ['OrderConfirmed',  orderConfirmedSchema],
  ['ProductUpdated',  productUpdatedSchema],
]);

export type OrderConfirmedEvent = z.infer<typeof orderConfirmedSchema>;
export type ProductUpdatedEvent = z.infer<typeof productUpdatedSchema>;
```

```ts
// использование в consumer
import { SCHEMA_REGISTRY } from '../schema-registry';

eachMessage: async ({ message }) => {
  const raw = JSON.parse(message.value!.toString()) as { eventType: string };
  const schema = SCHEMA_REGISTRY.get(raw.eventType);
  if (!schema) throw new Error(`Unknown eventType: ${raw.eventType}`);
  const event = schema.parse(raw);
  await handler.handle(event);
};
```

Почему статический реестр, а не `require(raw.eventType)`:
- Динамический `require` по строке из payload открывает вектор атаки: attacker кладёт в `eventType` путь к системному модулю или к гаджет-цепочке из `node_modules`. Это Node-аналог `trusted.packages: '*'` из Spring.
- Статический `Map` — explicitly-allowed list: если `eventType` не в реестре — `schema` равен `undefined`, и обработчик отказывает с ошибкой. Никакой магии.
- zod-схема проверяет структуру и типы на границе consumer, а не в глубине домена.

## Fail-fast: `fetchTopicMetadata` на старте

`R-KFK-CFG-4`: сервис не принимает трафик, пока не убедится что все ожидаемые топики существуют.

```ts
// Уже показано в KafkaModule.onApplicationBootstrap выше.
// Вот изолированная логика для ясности:

async function assertTopicsExist(admin: Admin, topics: readonly string[]): Promise<void> {
  const metadata = await admin.fetchTopicMetadata({ topics: [...topics] });
  const existing = new Set(metadata.topics.map((t) => t.name));
  const missing  = topics.filter((t) => !existing.has(t));
  if (missing.length) {
    throw new Error(`[Kafka] Required topics missing: ${missing.join(', ')}`);
  }
}
```

Сценарий без проверки:

1. Деплой нового consumer в `orders.confirmed.retry.10m`.
2. SRE не создал топик `orders.confirmed.retry.10m` в продакшене.
3. NestJS стартует, consumer-group регистрируется, сообщений нет.
4. Retry-логика молча не работает — никаких алертов.
5. Через несколько дней пользователи сообщают о «пропавших» заказах.

С `assertTopicsExist` — процесс бросает исключение в `onApplicationBootstrap`, K8s видит `CrashLoopBackOff`, on-call получает алерт в течение минуты.

В dev-окружении топики создаются автоматически (`allowAutoTopicCreation: true` на брокере) — проверка пройдёт. В продакшене автосоздание отключено — проверка поймает несоответствие.

## Consumer: `autoCommit: false` — явно

```ts
// infra/kafka/order-confirmed.consumer.ts
import { Injectable, OnApplicationBootstrap, Inject } from '@nestjs/common';
import { Kafka, Consumer } from 'kafkajs';
import { ConfigService } from '@nestjs/config';

@Injectable()
export class OrderConfirmedConsumer implements OnApplicationBootstrap {
  private consumer: Consumer;

  constructor(
    private readonly kafka: Kafka,
    private readonly cfg: ConfigService<KafkaConfig>,
    private readonly handler: ConfirmOrderPaymentHandler,
  ) {}

  async onApplicationBootstrap(): Promise<void> {
    this.consumer = this.kafka.consumer({
      groupId:          this.cfg.get('consumer.groupId', { infer: true }),
      sessionTimeout:   this.cfg.get('consumer.sessionTimeout',   { infer: true }),
      rebalanceTimeout: this.cfg.get('consumer.rebalanceTimeout', { infer: true }),
    });

    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });

    await this.consumer.run({
      autoCommit:                    false,
      partitionsConsumedConcurrently: 3,
      eachMessage: async ({ topic, partition, message, heartbeat }) => {
        const raw    = JSON.parse(message.value!.toString()) as { eventType: string };
        const schema = SCHEMA_REGISTRY.get(raw.eventType);
        if (!schema) throw new Error(`Unknown eventType: ${raw.eventType}`);
        const event  = schema.parse(raw) as OrderConfirmedEvent;

        await heartbeat();
        await this.handler.handle(event);

        await this.consumer.commitOffsets([{
          topic, partition, offset: String(Number(message.offset) + 1),
        }]);
      },
    });
  }
}
```

`autoCommit: false` — явная строка, без неё kafkajs использует `autoCommit: true` по умолчанию и коммитит offset по таймеру независимо от того, завершилась обработка или нет. Crash между commit и успешной обработкой = потеря сообщения. Подробнее — [Consumer](/standards/backend/kafka/node/consumer.md).

## Что запрещено

| Антипаттерн | Правило | Что взамен |
|---|---|---|
| `dynamic require(eventType)` или `import(eventType)` по строке из payload | `R-KFK-CFG-X1` | статический `SCHEMA_REGISTRY: Map<string, ZodSchema>` |
| `brokers: ['kafka-prod:9092']` хардкодом в коде | `R-KFK-CFG-X2` | `process.env.KAFKA_BROKERS` через `ConfigService` + `@IsNotEmpty` |
| `autoCommit: true` (дефолт kafkajs) в продакшене | `R-KFK-CFG-2` | явный `autoCommit: false` + `commitOffsets` |
| Отсутствие `assertTopicsExist` на старте | `R-KFK-CFG-4` | `fetchTopicMetadata` в `onApplicationBootstrap` |
| `process.env.X` россыпью по модулям | `R-KFK-CFG-1` | `KafkaConfig` с `class-validator`, `ConfigService` |
| Credentials в `.env`-файле в репозитории | `R-KFK-CFG-X2` | env через CI/CD secrets / Vault |
| `idempotent: false` или `maxInFlightRequests > 5` | `R-KFK-PROD-1` | `{ idempotent: true, maxInFlightRequests: 5 }` |
| `ssl: false` в продакшене | `R-KFK-SEC-1` | `ssl: true` + `sasl` в `KafkaConfig` |

## Куда дальше

- [Producer](/standards/backend/kafka/node/producer.md) — `idempotent: true`, `acks: -1`, partition key.
- [Consumer](/standards/backend/kafka/node/consumer.md) — `autoCommit: false`, `groupId`, `fromBeginning`.
- [Event design](/standards/backend/kafka/node/event-design.md) — форма payload, zod-схема, версионирование.
- [Idempotent consumer](/standards/backend/kafka/node/idempotent-consumer.md) — `SCHEMA_REGISTRY` в связке с `processed_event`.
- [Outbox publishing](/standards/backend/kafka/node/outbox-publishing.md) — relay через `@Interval`, `setLock`.
- [Retry and DLQ](/standards/backend/kafka/node/retry-and-dlq.md) — retry-топики, `x-attempt`, DLQ-алерты.
- [Observability](/standards/backend/kafka/node/observability.md) — `prom-client`, lag-алерты, `traceparent`.
- [Security](/standards/backend/kafka/node/security.md) — SSL/SASL, ACL, `clientId`.
