← назад к разделу

В UCP-стеке Elasticsearch редко источник правды. Чаще — read-only поисковый индекс рядом с PostgreSQL/MongoDB-OLTP. Главные вопросы:

  1. Как делать запросы из Spring (ElasticsearchOperations, ElasticsearchRepository).
  2. Как синхронизировать данные из основной БД в ES (dual write, outbox, CDC).

Эта статья — про оба.

Подключение

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-data-elasticsearch")
}
spring.elasticsearch.uris=http://elasticsearch:9200
spring.elasticsearch.username=elastic
spring.elasticsearch.password=${ES_PASSWORD}
spring.elasticsearch.connection-timeout=2s
spring.elasticsearch.socket-timeout=10s

Spring Boot создаёт RestClient под капотом (поверх HTTP), плюс ElasticsearchOperations (high-level) и автоматический сканер ElasticsearchRepository-интерфейсов.

Маппинг entity

@Document(indexName = "products")
@Setting(settingPath = "elasticsearch/product-settings.json")
public class ProductDoc {

    @Id
    private String id;

    @Field(type = FieldType.Text, analyzer = "russian")
    private String name;

    @MultiField(
        mainField = @Field(type = FieldType.Text, analyzer = "russian"),
        otherFields = {
            @InnerField(suffix = "raw", type = FieldType.Keyword)
        }
    )
    private String description;

    @Field(type = FieldType.Long)
    private Long categoryId;

    @Field(type = FieldType.ScaledFloat, scalingFactor = 100)
    private BigDecimal price;

    @Field(type = FieldType.Boolean)
    private Boolean inStock;

    @Field(type = FieldType.Date, format = DateFormat.date_time)
    private Instant createdAt;

    // getters, setters, no @Entity (это не JPA)
}

@Document(indexName = "products") — связывает класс с индексом. @Field — указывает тип и параметры. @MultiField — то же, что multifields в mapping API.

Если хотите более тонкий контроль над mapping — @Mapping(mappingPath = "elasticsearch/product-mapping.json") с явным JSON-файлом.

ElasticsearchRepository — для простых случаев

public interface ProductRepository extends ElasticsearchRepository<ProductDoc, String> {

    Page<ProductDoc> findByCategoryId(Long categoryId, Pageable pageable);

    List<ProductDoc> findByNameContainingAndInStockTrue(String namePart);

    long countByPriceBetween(BigDecimal min, BigDecimal max);
}

Spring парсит имена методов и генерирует Query DSL. Работает для простых запросов: фильтры, sort, paging.

Минусы:

  • Имена методов растут до 8-10 слов, читать тяжело.
  • Нет контроля над scoring, fuzziness, boost.
  • Aggregations не покрываются.

Для serious запросов — ElasticsearchOperations.

ElasticsearchOperations — для серьёзных запросов

@Service
@RequiredArgsConstructor
public class ProductSearchService {

    private final ElasticsearchOperations elasticsearch;

    public SearchHits<ProductDoc> search(String text, Set<Long> categories,
                                          BigDecimal minPrice, BigDecimal maxPrice,
                                          Pageable pageable) {
        var criteria = new Criteria("name").matches(text)
            .and(new Criteria("inStock").is(true));

        if (!categories.isEmpty()) {
            criteria = criteria.and(new Criteria("categoryId").in(categories));
        }
        if (minPrice != null) {
            criteria = criteria.and(new Criteria("price").greaterThanEqual(minPrice));
        }
        if (maxPrice != null) {
            criteria = criteria.and(new Criteria("price").lessThanEqual(maxPrice));
        }

        var query = new CriteriaQuery(criteria, pageable);
        return elasticsearch.search(query, ProductDoc.class);
    }
}

Если нужны фичи, которых нет в CriteriaQuery (function_score, custom aggregations, suggester) — использовать низкоуровневый NativeQuery с лямбдой, формирующей весь DSL:

import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;

public SearchHits<ProductDoc> searchWithFunctionScore(String text) {
    var query = NativeQuery.builder()
        .withQuery(q -> q
            .functionScore(fs -> fs
                .query(qq -> qq.match(m -> m.field("name").query(text)))
                .functions(f -> f
                    .gauss(g -> g
                        .field("createdAt")
                        .placement(p -> p.origin("now").scale("30d").decay(0.5))))
            ))
        .build();
    return elasticsearch.search(query, ProductDoc.class);
}

Это уже близко к написанию raw JSON-DSL, но с type-safety.

Bulk-индексация

Индексировать по одному документу — медленно (HTTP-overhead + переключение refresh'а). На массовых импортах используем bulk:

@Service
@RequiredArgsConstructor
public class ProductIndexer {

    private final ElasticsearchOperations elasticsearch;

    public void reindexAll(List<ProductDoc> docs) {
        var queries = docs.stream()
            .map(doc -> new IndexQueryBuilder()
                .withId(doc.getId())
                .withObject(doc)
                .build())
            .toList();

        elasticsearch.bulkIndex(queries, ProductDoc.class);
    }
}

Bulk даёт 10-50× speedup. Для совсем больших массивов (миллионы документов) — разбивать на пачки по 500-5000 и временно отключать refresh:

public void bulkReindex(List<ProductDoc> docs) {
    var indexOps = elasticsearch.indexOps(ProductDoc.class);

    // отключить refresh
    indexOps.putSettings(Map.of("index.refresh_interval", "-1"));

    try {
        Lists.partition(docs, 5000).forEach(this::reindexAll);
    } finally {
        // вернуть и forced refresh
        indexOps.putSettings(Map.of("index.refresh_interval", "1s"));
        indexOps.refresh();
    }
}

Синхронизация PG/Mongo → ES

Главная архитектурная развилка при работе с ES в UCP-стеке. Четыре подхода:

Подход 1: Dual Write — простой и опасный

В одной транзакции пишем в БД и в ES:

@Transactional
public void save(Product product) {
    productRepo.save(product);              // PG
    elasticsearch.save(toDoc(product));     // ES
}

Проблема: если PG-commit прошёл, а ES-запрос упал — данные разъехались. Восстановить тяжело, нужен periodic reconcile. Антипаттерн для production. Подходит только для прототипов.

Подход 2: Transactional Outbox

В транзакции БД пишем бизнес-данные и событие в таблицу outbox:

@Transactional
public void save(Product product) {
    productRepo.save(product);
    outboxRepo.save(new OutboxEvent(
        UUID.randomUUID(),
        "product.updated",
        toJson(product)
    ));
}

Отдельный процесс читает outbox и индексирует в ES:

@Scheduled(fixedDelay = 1000)
@Transactional
public void publishToEs() {
    var batch = outboxRepo.fetchUnpublished(500);
    for (var event : batch) {
        var doc = parseProductDoc(event.payload());
        elasticsearch.save(doc);
        outboxRepo.markPublished(event.id());
    }
}

Плюсы: atomic с БД, retry бесплатно. Минусы: дополнительная инфраструктура, идемпотентность на стороне ES (но save по _id уже идемпотентен).

Detail про Outbox.

Подход 3: CDC через Debezium → Kafka → ES Sink Connector

Самый production-grade вариант:

PostgreSQL  ──── Debezium ────→ Kafka ────→ Kafka Connect ES Sink ────→ Elasticsearch
  WAL/logical
  replication
  • Debezium читает WAL/logical replication PostgreSQL (или oplog MongoDB), производит change events в Kafka.
  • Elasticsearch Sink Connector читает topic и пишет в ES (insert/update/delete).
  • Никакого кода в основном сервисе — он просто пишет в БД как обычно.

Плюсы: полная развязка, гарантия доставки, ловит все изменения (включая операционные правки руками в БД). Минусы: инфраструктура Debezium + Kafka Connect + monitoring отдельно, latency ~100-500ms.

Это стандартный подход для зрелых UCP-сервисов с серьёзным ES-индексом.

Подход 4: Полный re-index по расписанию

Когда данные меняются редко (каталоги, словари), можно периодически пересоздавать индекс с нуля:

@Scheduled(cron = "0 0 3 * * *", zone = "Europe/Moscow")
public void reindexCatalog() {
    var newIndex = "products-v" + System.currentTimeMillis();
    elasticsearch.indexOps(IndexCoordinates.of(newIndex)).create(/* settings, mapping */);

    productRepo.findAll().forEach(product -> {
        elasticsearch.save(toDoc(product), IndexCoordinates.of(newIndex));
    });

    // alias-переключение: read-pointer "products" теперь смотрит на новый индекс
    elasticsearch.indexOps(IndexCoordinates.of("products")).alias(
        new AliasActions().add(new AliasAction.Add(
            AliasActionParameters.builderForAdd().withIndices(newIndex).withAliases("products").build()
        ))
    );
}

Прост и надёжен для каталогов на ~миллион документов. Не подходит, когда нужна near-real-time индексация.

Aliases — переключение индексов

Никогда не указывайте клиенту реальное имя индекса. Используйте alias — указатель, который можно атомарно переключать:

PUT /products-v1
POST /_aliases
{
  "actions": [
    { "add": { "index": "products-v1", "alias": "products" } }
  ]
}

# приложение читает по alias "products"

# когда надо передеплоить: создаём products-v2, заполняем, потом:
POST /_aliases
{
  "actions": [
    { "remove": { "index": "products-v1", "alias": "products" } },
    { "add":    { "index": "products-v2", "alias": "products" } }
  ]
}

# приложение незаметно для себя начинает читать новый индекс

Это даёт zero-downtime miграции схемы (mapping не позволяет изменить тип — переход через новый индекс + alias).

Ловушки

1. ES не транзакционен

elasticsearch.save(doc) коммитится сразу, не откатывается с @Transactional PostgreSQL. Любой подход синхронизации должен это учитывать.

2. Refresh interval по умолчанию = 1s

Документ, сохранённый секунду назад, может ещё не быть виден в поиске. Если в integration-тесте ищите только что сохранённый документ — нужен elasticsearch.indexOps(...).refresh() или ставьте refresh: WAIT_UNTIL:

elasticsearch.save(doc, RefreshPolicy.WAIT_UNTIL);  // блокирует до refresh

В проде — дорого, не использовать. В тестах — норма.

3. Версия клиента vs версия кластера

elasticsearch-java-клиент 8.x работает с кластером 8.x (с некоторой обратной совместимостью). При апгрейде ES — проверяйте clien'та.

4. @Field(type = FieldType.Keyword) vs Long

category_id записан как Long — для term-запроса работает. Если записать как Keyword строкой — придётся менять mapping (через re-index). Лучше сразу типизировать корректно.

5. Большие документы

ES не любит документы >10 MB. Если нужно хранить большие тексты — поделить на чанки или вынести в S3 + ссылку.

Что почитать дальше

  • Fundamentals — устройство индекса, понимание которого критично для написания клиентского кода.
  • Query DSL и relevance — что генерирует Spring Data ES.
  • Operations — ILM, snapshots, sizing.
  • Распределённые паттерны — Outbox/CDC паттерны.
  • Spring Data Elasticsearch reference — официальная документация.