Опирается на правила: R-KFK-CFG-1R-KFK-CFG-4 и R-KFK-CFG-X1R-KFK-CFG-X2 из Kafka Style Guide → раздел 7. Конфигурация.

Важно знать

  • @ConfigurationProperties + @Validated для Kafka settings (см. R-VLD-CFG-*).
  • bootstrap-servers через env (${KAFKA_BROKERS}) — никакого hard-code.
  • Producer: enable.idempotence: true, acks: all.
  • Consumer: auto-offset-reset: earliest, enable-auto-commit: false.
  • Listener: ack-mode: MANUAL_IMMEDIATE, missing-topics-fatal: true.
  • spring.json.trusted.packages — explicit allow-list. '*' — security risk.
  • missing-topics-fatal: true в проде — fail-fast на отсутствующем топике.

Конфигурация Kafka — место, где малая ошибка стоит больно. enable.idempotence: false → дубликаты во всём пайплайне. trusted.packages: '*' → RCE через deserialization. missing-topics-fatal: false → consumer запускается на несуществующем топике, тихо бездействует, никто не замечает неделями.

@ConfigurationProperties + @Validated

R-KFK-CFG-1: типизированные настройки с валидацией.

@ConfigurationProperties("kafka")
@Validated
public record KafkaSettings(
    @NotBlank String bootstrapServers,
    @NotNull Producer producer,
    @NotEmpty Map<String, ConsumerConfig> consumers
) {
    public record Producer(
        @NotNull Duration requestTimeout,
        @Min(0) int retryBackoffMs
    ) {}

    public record ConsumerConfig(
        @NotBlank String groupId,
        @NotEmpty List<@NotBlank String> topics,
        @Min(1) int concurrency,
        @NotNull Duration maxPollInterval
    ) {}
}

@Configuration
@EnableConfigurationProperties(KafkaSettings.class)
public class KafkaSettingsRegistration {}

Что это даёт:

  • При старте Spring проверяет все @NotBlank/@NotNull — если в application.yml забыли bootstrap-servers — fail-fast.
  • IDE даёт autocomplete на application.yml.
  • В коде есть типизированный access вместо @Value("${kafka.bootstrap-servers}").

Подробнее — Validation → @ConfigurationProperties.

application.yml

R-KFK-CFG-2: полный пример.

spring:
  kafka:
    bootstrap-servers: ${KAFKA_BROKERS:localhost:9092}
    producer:
      acks: all
      properties:
        enable.idempotence: true
        request.timeout.ms: 30000
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: ru.example.events
        spring.json.use.type.headers: false
        max.poll.records: 100
        max.poll.interval.ms: 600000
    listener:
      ack-mode: MANUAL_IMMEDIATE
      missing-topics-fatal: true
      observation-enabled: true

Каждая опция объяснена в соответствующих разделах:

  • enable.idempotence, acks — Producer.
  • auto-offset-reset, enable-auto-commit, ack-mode — Consumer.
  • observation-enabled — Observability.

trusted.packages explicit allow-list

R-KFK-CFG-3: какие пакеты можно десериализовать.

spring:
  kafka:
    consumer:
      properties:
        spring.json.trusted.packages: ru.example.events,ru.example.shared.events

JsonDeserializer использует Jackson + проверку trusted-packages. Если в Kafka headers пришёл __TypeId__: ru.evil.RemoteCodeExec и пакет не в trusted — deserialization отказывается, бросает IllegalArgumentException.

Без allow-list (или с '*') attacker может через произвольную JSON отправить gadget chain, который при deserialization выполнит код в JVM. CVE на эту тему — десятки.

В нашем коде events объявлены в ru.example.events.* — этот пакет разрешён. Всё остальное — блокировано.

Дополнительно spring.json.use.type.headers: false отключает чтение __TypeId__ из headers — deserialize строго в тип, объявленный в listener-сигнатуре:

@KafkaListener(...)
public void handle(OrderConfirmedEvent event) {
}

Если в Kafka headers __TypeId__: ru.evil.Foo, но use.type.headers: false — Spring игнорирует, deserialize в OrderConfirmedEvent. Это ещё один уровень защиты.

missing-topics-fatal: true

R-KFK-CFG-4: fail-fast на старте.

spring:
  kafka:
    listener:
      missing-topics-fatal: true

При старте Spring проверяет: все ли топики, на которые @KafkaListener, существуют в Kafka. Если нет — BeanCreationException, сервис не стартует.

Сценарий без этого флага:

  1. Деплой v1.5 с новым @KafkaListener(topics = "user.events").
  2. SRE забыл создать топик user.events в проде.
  3. Сервис стартует, listener молча создаёт consumer-group на несуществующем топике.
  4. Никаких событий не приходит, никаких алертов.
  5. Через неделю QA замечает «новый функционал не работает».

С missing-topics-fatal: true — сервис не стартует, K8s показывает CrashLoopBackOff, SRE сразу видит «топика нет».

В dev/test может быть false (топики создаются динамически), но в prod/staging — всегда true.

Что запрещено

spring.json.trusted.packages: '*'

R-KFK-CFG-X1: security risk.

# КАТАСТРОФА
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'

Любой класс из classpath может быть инстанцирован через JSON deserialization. Известные gadget chains позволяют выполнить произвольный код (например ysoserial payloads). RCE через Kafka сообщение.

Всегда explicit пакеты.

bootstrap-servers hard-coded

R-KFK-CFG-X2: невозможно catch разные кластеры.

# ПЛОХО
spring.kafka.bootstrap-servers: kafka-prod-1.internal:9092,kafka-prod-2.internal:9092

# ХОРОШО — через env
spring.kafka.bootstrap-servers: ${KAFKA_BROKERS:localhost:9092}

Hard-coded — нельзя поднять test environment без редактирования config. CI/CD деплоит один образ в test и prod с разным KAFKA_BROKERS env var.

Также не hard-code credentials, ACL keys — всегда через env / Vault.

Что запрещено — таблица

АнтипаттернПравилоЧто взамен
spring.json.trusted.packages: '*'R-KFK-CFG-X1explicit allow-list пакетов
bootstrap-servers hard-codedR-KFK-CFG-X2${KAFKA_BROKERS} через env
missing-topics-fatal: false в продеR-KFK-CFG-4true — fail-fast
Settings без @ValidatedR-KFK-CFG-1@ConfigurationProperties + @Validated
@Value("${kafka.bootstrap-servers}") россыпьюR-KFK-CFG-1типизированные records
enable-auto-commit: true в configR-KFK-CFG-2false + manual ack
auto-offset-reset: latest для criticalR-KFK-CFG-2earliest
use.type.headers: true без причиныR-KFK-CFG-3false — strict typing на listener
Credentials в application.ymlR-KFK-CFG-X2env / Vault

Куда дальше