← назад к разделу

Elasticsearch — мощный поисковый движок, но писать напрямую HTTP-запросы его JSON API — это много шаблонного кода. Spring Data Elasticsearch берёт эту работу на себя: вы описываете структуру документа Java-классом, а поиск и индексацию делаете привычными Spring-методами.

Разберём по шагам: как подключить, как описать документ, как искать и как держать индекс в синхронизации с основной базой данных.

Подключение

Добавьте зависимость в build.gradle.kts:

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-data-elasticsearch")
}

И настройте адрес кластера в application.properties:

spring.elasticsearch.uris=http://elasticsearch:9200
spring.elasticsearch.username=elastic
spring.elasticsearch.password=${ES_PASSWORD}
spring.elasticsearch.connection-timeout=2s
spring.elasticsearch.socket-timeout=10s

Spring Boot автоматически создаст RestClient (HTTP-клиент к Elasticsearch), поверх него — ElasticsearchOperations для удобных запросов, и сканирует пакеты в поисках ElasticsearchRepository-интерфейсов, которые реализует на лету.

Как описать документ через @Document

В JPA вы описываете таблицу через @Entity. В Elasticsearch аналогичную роль играет @Document — он связывает Java-класс с конкретным индексом.

@Document(indexName = "products")
public class ProductDoc {

    @Id
    private String id;

    @Field(type = FieldType.Text, analyzer = "russian")
    private String name;

    @MultiField(
        mainField = @Field(type = FieldType.Text, analyzer = "russian"),
        otherFields = {
            @InnerField(suffix = "raw", type = FieldType.Keyword)
        }
    )
    private String description;

    @Field(type = FieldType.Long)
    private Long categoryId;

    @Field(type = FieldType.ScaledFloat, scalingFactor = 100)
    private BigDecimal price;

    @Field(type = FieldType.Boolean)
    private Boolean inStock;

    @Field(type = FieldType.Date, format = DateFormat.date_time)
    private Instant createdAt;

    // геттеры и сеттеры
}

Несколько важных деталей:

  • @Field(type = FieldType.Text, analyzer = "russian") — поле полнотекстового поиска с русскоязычным анализатором (стемминг, стоп-слова).
  • @MultiField — одно поле в двух вариантах: Text для поиска по смыслу и Keyword (суффикс .raw) для точных фильтров и сортировки.
  • @ScaledFloat — рекомендуемый тип для цен, хранит число как целое с масштабом (100 → копейки).
  • Это не JPA: здесь нет @Entity, @Table, транзакций Hibernate. Класс нужен только для маппинга ES-документа.

Если нужен тонкий контроль маппинга — укажите готовый JSON-файл: @Mapping(mappingPath = "elasticsearch/product-mapping.json").

Простые запросы через ElasticsearchRepository

Самый быстрый старт — объявить интерфейс-репозиторий:

public interface ProductRepository extends ElasticsearchRepository<ProductDoc, String> {

    Page<ProductDoc> findByCategoryId(Long categoryId, Pageable pageable);

    List<ProductDoc> findByNameContainingAndInStockTrue(String namePart);

    long countByPriceBetween(BigDecimal min, BigDecimal max);
}

Spring парсит имена методов и генерирует Query DSL автоматически. Работает для простых случаев: фильтры по точным значениям, сортировка, постраничная выдача.

Когда репозитория уже не хватает:

  • Имена методов разрастаются до 8–10 слов и плохо читаются.
  • Нет контроля над нечётким поиском (fuzziness), весами полей, агрегациями.
  • Нельзя написать сложный bool-запрос с несколькими условиями.

Для таких случаев есть ElasticsearchOperations.

Гибкие запросы через ElasticsearchOperations

ElasticsearchOperations — это низкоуровневый API, который даёт полный контроль над запросами:

@Service
@RequiredArgsConstructor
public class ProductSearchService {

    private final ElasticsearchOperations elasticsearch;

    public SearchHits<ProductDoc> search(String text, Set<Long> categories,
                                          BigDecimal minPrice, BigDecimal maxPrice,
                                          Pageable pageable) {
        var criteria = new Criteria("name").matches(text)
            .and(new Criteria("inStock").is(true));

        if (!categories.isEmpty()) {
            criteria = criteria.and(new Criteria("categoryId").in(categories));
        }
        if (minPrice != null) {
            criteria = criteria.and(new Criteria("price").greaterThanEqual(minPrice));
        }
        if (maxPrice != null) {
            criteria = criteria.and(new Criteria("price").lessThanEqual(maxPrice));
        }

        var query = new CriteriaQuery(criteria, pageable);
        return elasticsearch.search(query, ProductDoc.class);
    }
}

Когда даже CriteriaQuery не хватает — например, нужны агрегации или функция затухания по дате — используют NativeQuery. Это уже практически raw JSON, но с проверкой типов на этапе компиляции:

public SearchHits<ProductDoc> searchWithFunctionScore(String text) {
    var query = NativeQuery.builder()
        .withQuery(q -> q
            .functionScore(fs -> fs
                .query(qq -> qq.match(m -> m.field("name").query(text)))
                .functions(f -> f
                    .gauss(g -> g
                        .field("createdAt")
                        .placement(p -> p.origin("now").scale("30d").decay(0.5))))
            ))
        .build();
    return elasticsearch.search(query, ProductDoc.class);
}

Массовая индексация

Индексировать документы по одному — медленно: каждый вызов это отдельный HTTP-запрос плюс ожидание подтверждения. При загрузке большого каталога так не работают.

Bulk API позволяет передать тысячи документов одним запросом:

@Service
@RequiredArgsConstructor
public class ProductIndexer {

    private final ElasticsearchOperations elasticsearch;

    public void reindexAll(List<ProductDoc> docs) {
        var queries = docs.stream()
            .map(doc -> new IndexQueryBuilder()
                .withId(doc.getId())
                .withObject(doc)
                .build())
            .toList();

        elasticsearch.bulkIndex(queries, ProductDoc.class);
    }
}

Для очень больших объёмов (миллионы документов) разбивайте на пачки по 500–5000 и временно отключайте автообновление индекса (refresh_interval), чтобы не тратить ресурсы на промежуточные версии:

public void bulkReindex(List<ProductDoc> docs) {
    var indexOps = elasticsearch.indexOps(ProductDoc.class);

    indexOps.putSettings(Map.of("index.refresh_interval", "-1"));

    try {
        Lists.partition(docs, 5000).forEach(this::reindexAll);
    } finally {
        indexOps.putSettings(Map.of("index.refresh_interval", "1s"));
        indexOps.refresh();
    }
}

Прирост скорости по сравнению с поштучной индексацией — в 10–50 раз.

Как держать индекс актуальным: четыре подхода

Elasticsearch в большинстве приложений — не основная база данных, а поисковый индекс рядом с PostgreSQL. Данные появляются в PostgreSQL, и нужно своевременно отражать их в ES. Есть четыре способа это организовать.

Двойная запись — просто, но ненадёжно

Самый очевидный вариант: пишем в PostgreSQL и в ES в одном методе.

@Transactional
public void save(Product product) {
    productRepo.save(product);              // PostgreSQL
    elasticsearch.save(toDoc(product));     // Elasticsearch
}

Проблема: транзакция PostgreSQL и HTTP-запрос к ES — это две разные операции. Если PostgreSQL закоммитил, а ES вернул ошибку сети — данные разошлись, и неясно, как их привести обратно. Подходит только для прототипов, не для продакшена.

Transactional Outbox — надёжно, но нужна инфраструктура

Внутри транзакции PostgreSQL вместе с бизнес-данными записываем событие в специальную таблицу outbox. Отдельный процесс-читатель берёт события из этой таблицы и отправляет в ES.

@Transactional
public void save(Product product) {
    productRepo.save(product);
    outboxRepo.save(new OutboxEvent(
        UUID.randomUUID(),
        "product.updated",
        toJson(product)
    ));
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void publishToEs() {
    var batch = outboxRepo.fetchUnpublished(500);
    for (var event : batch) {
        var doc = parseProductDoc(event.payload());
        elasticsearch.save(doc);
        outboxRepo.markPublished(event.id());
    }
}

Плюсы: данные не разойдутся — событие записано атомарно вместе с бизнес-данными; при сбое ES событие останется непрочитанным и будет повторено. Минусы: нужна таблица outbox, логика читателя и мониторинг отставания.

Подробнее об этом паттерне — в разделе Распределённые паттерны.

CDC через Debezium → Kafka → ES — промышленный вариант

Change Data Capture (CDC) — подход, при котором ваш сервис вообще не знает про Elasticsearch. Он просто пишет в PostgreSQL.

PostgreSQL  →  Debezium  →  Kafka  →  Kafka Connect ES Sink  →  Elasticsearch
  (WAL)

Debezium читает журнал транзакций PostgreSQL (WAL) и публикует каждое изменение как событие в Kafka. Коннектор Elasticsearch Sink читает эти события и вставляет/обновляет/удаляет документы в ES.

Преимущества: сервис не привязан к ES; ловятся все изменения, в том числе прямые правки в базе; при сбое коннектор продолжает с последней позиции. Задержка индексации — обычно 100–500 миллисекунд. Недостаток — нужно развернуть и поддерживать Debezium, Kafka и Kafka Connect.

Полная переиндексация по расписанию

Если данные меняются редко (справочники, каталоги), можно просто пересоздавать весь индекс ночью:

@Scheduled(cron = "0 0 3 * * *", zone = "Europe/Moscow")
public void reindexCatalog() {
    var newIndex = "products-v" + System.currentTimeMillis();
    elasticsearch.indexOps(IndexCoordinates.of(newIndex)).create();

    productRepo.findAll().forEach(product ->
        elasticsearch.save(toDoc(product), IndexCoordinates.of(newIndex))
    );

    // переключаем alias: запросы к "products" теперь идут в новый индекс
    elasticsearch.indexOps(IndexCoordinates.of("products")).alias(
        new AliasActions().add(new AliasAction.Add(
            AliasActionParameters.builderForAdd()
                .withIndices(newIndex).withAliases("products").build()
        ))
    );
}

Прост в реализации и обслуживании. Не подходит, когда обновления нужно видеть в поиске в реальном времени.

Aliases — переключение индексов без простоя

Схему Elasticsearch нельзя изменить на месте: если нужно переименовать поле или сменить тип — придётся создать новый индекс и перелить данные. Чтобы при этом приложение продолжало работать, используют алиасы.

Алиас — это указатель на индекс. Приложение обращается к алиасу products, не зная, какой именно индекс за ним стоит.

# создаём индекс и привязываем к нему алиас
PUT /products-v1
POST /_aliases
{
  "actions": [
    { "add": { "index": "products-v1", "alias": "products" } }
  ]
}

# когда нужно сменить схему: создаём products-v2, переливаем данные, переключаем
POST /_aliases
{
  "actions": [
    { "remove": { "index": "products-v1", "alias": "products" } },
    { "add":    { "index": "products-v2", "alias": "products" } }
  ]
}

Переключение алиаса атомарно: в один момент все запросы перейдут на новый индекс без каких-либо ошибок на стороне приложения.

Типичные ловушки

Elasticsearch не транзакционен

elasticsearch.save(doc) сохраняет документ немедленно и не откатывается вместе с транзакцией PostgreSQL. Это фундаментальное ограничение, и все подходы синхронизации выше построены именно вокруг него.

Документ не виден сразу после сохранения

По умолчанию ES обновляет поисковый индекс раз в секунду. Если в интеграционном тесте вы ищете документ сразу после его сохранения — он может ещё не появиться. Решение для тестов:

elasticsearch.save(doc, RefreshPolicy.WAIT_UNTIL);  // дождаться обновления индекса

В продакшене это дорого — там лучше принять задержку как данность.

Версия клиента и версия кластера должны совпадать

Клиент elasticsearch-java версии 8.x работает с кластером 8.x. При обновлении кластера — обновляйте и клиента.

Тип поля важно задать сразу

Если поле categoryId описать как Keyword вместо Long, его нужно будет передавать строкой. Позже изменить тип без пересоздания индекса нельзя. Сразу задавайте правильные типы.

Размер документа

Elasticsearch плохо работает с документами больше 10 МБ. Если нужно хранить большие тексты — разбивайте на части или храните в отдельном хранилище, в ES оставляя только поисковые поля.

Коротко

  • spring-boot-starter-data-elasticsearch автоматически настраивает клиента и репозитории.
  • @Document описывает индекс, @Field — типы полей; @MultiField — поле в двух вариантах для поиска и точной фильтрации.
  • ElasticsearchRepository подходит для простых запросов по именам методов; ElasticsearchOperations — для сложных запросов с полным контролем.
  • Bulk API быстрее поштучной индексации в 10–50 раз; при большом объёме временно отключайте refresh_interval.
  • Двойная запись ненадёжна; для продакшена — Transactional Outbox или CDC через Debezium.
  • Используйте алиасы вместо прямых имён индексов — это даёт возможность менять схему без простоя.
  • После сохранения документ становится видимым в поиске примерно через секунду — учитывайте в тестах.

Что почитать дальше

  • Fundamentals — устройство индекса, шарды, репликация.
  • Query DSL и relevance — что генерирует Spring Data Elasticsearch под капотом.
  • Operations — ILM, снэпшоты, настройка кластера.
  • Распределённые паттерны — подробнее об Outbox и CDC.