В UCP-стеке Elasticsearch редко источник правды. Чаще — read-only поисковый индекс рядом с PostgreSQL/MongoDB-OLTP. Главные вопросы:
- Как делать запросы из Spring (
ElasticsearchOperations,ElasticsearchRepository). - Как синхронизировать данные из основной БД в ES (dual write, outbox, CDC).
Эта статья — про оба.
Подключение
dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-elasticsearch")
}
spring.elasticsearch.uris=http://elasticsearch:9200
spring.elasticsearch.username=elastic
spring.elasticsearch.password=${ES_PASSWORD}
spring.elasticsearch.connection-timeout=2s
spring.elasticsearch.socket-timeout=10s
Spring Boot создаёт RestClient под капотом (поверх HTTP), плюс ElasticsearchOperations (high-level) и автоматический сканер ElasticsearchRepository-интерфейсов.
Маппинг entity
@Document(indexName = "products")
@Setting(settingPath = "elasticsearch/product-settings.json")
public class ProductDoc {
@Id
private String id;
@Field(type = FieldType.Text, analyzer = "russian")
private String name;
@MultiField(
mainField = @Field(type = FieldType.Text, analyzer = "russian"),
otherFields = {
@InnerField(suffix = "raw", type = FieldType.Keyword)
}
)
private String description;
@Field(type = FieldType.Long)
private Long categoryId;
@Field(type = FieldType.ScaledFloat, scalingFactor = 100)
private BigDecimal price;
@Field(type = FieldType.Boolean)
private Boolean inStock;
@Field(type = FieldType.Date, format = DateFormat.date_time)
private Instant createdAt;
// getters, setters, no @Entity (это не JPA)
}
@Document(indexName = "products") — связывает класс с индексом. @Field — указывает тип и параметры. @MultiField — то же, что multifields в mapping API.
Если хотите более тонкий контроль над mapping — @Mapping(mappingPath = "elasticsearch/product-mapping.json") с явным JSON-файлом.
ElasticsearchRepository — для простых случаев
public interface ProductRepository extends ElasticsearchRepository<ProductDoc, String> {
Page<ProductDoc> findByCategoryId(Long categoryId, Pageable pageable);
List<ProductDoc> findByNameContainingAndInStockTrue(String namePart);
long countByPriceBetween(BigDecimal min, BigDecimal max);
}
Spring парсит имена методов и генерирует Query DSL. Работает для простых запросов: фильтры, sort, paging.
Минусы:
- Имена методов растут до 8-10 слов, читать тяжело.
- Нет контроля над scoring, fuzziness, boost.
- Aggregations не покрываются.
Для serious запросов — ElasticsearchOperations.
ElasticsearchOperations — для серьёзных запросов
@Service
@RequiredArgsConstructor
public class ProductSearchService {
private final ElasticsearchOperations elasticsearch;
public SearchHits<ProductDoc> search(String text, Set<Long> categories,
BigDecimal minPrice, BigDecimal maxPrice,
Pageable pageable) {
var criteria = new Criteria("name").matches(text)
.and(new Criteria("inStock").is(true));
if (!categories.isEmpty()) {
criteria = criteria.and(new Criteria("categoryId").in(categories));
}
if (minPrice != null) {
criteria = criteria.and(new Criteria("price").greaterThanEqual(minPrice));
}
if (maxPrice != null) {
criteria = criteria.and(new Criteria("price").lessThanEqual(maxPrice));
}
var query = new CriteriaQuery(criteria, pageable);
return elasticsearch.search(query, ProductDoc.class);
}
}
Если нужны фичи, которых нет в CriteriaQuery (function_score, custom aggregations, suggester) — использовать низкоуровневый NativeQuery с лямбдой, формирующей весь DSL:
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
public SearchHits<ProductDoc> searchWithFunctionScore(String text) {
var query = NativeQuery.builder()
.withQuery(q -> q
.functionScore(fs -> fs
.query(qq -> qq.match(m -> m.field("name").query(text)))
.functions(f -> f
.gauss(g -> g
.field("createdAt")
.placement(p -> p.origin("now").scale("30d").decay(0.5))))
))
.build();
return elasticsearch.search(query, ProductDoc.class);
}
Это уже близко к написанию raw JSON-DSL, но с type-safety.
Bulk-индексация
Индексировать по одному документу — медленно (HTTP-overhead + переключение refresh'а). На массовых импортах используем bulk:
@Service
@RequiredArgsConstructor
public class ProductIndexer {
private final ElasticsearchOperations elasticsearch;
public void reindexAll(List<ProductDoc> docs) {
var queries = docs.stream()
.map(doc -> new IndexQueryBuilder()
.withId(doc.getId())
.withObject(doc)
.build())
.toList();
elasticsearch.bulkIndex(queries, ProductDoc.class);
}
}
Bulk даёт 10-50× speedup. Для совсем больших массивов (миллионы документов) — разбивать на пачки по 500-5000 и временно отключать refresh:
public void bulkReindex(List<ProductDoc> docs) {
var indexOps = elasticsearch.indexOps(ProductDoc.class);
// отключить refresh
indexOps.putSettings(Map.of("index.refresh_interval", "-1"));
try {
Lists.partition(docs, 5000).forEach(this::reindexAll);
} finally {
// вернуть и forced refresh
indexOps.putSettings(Map.of("index.refresh_interval", "1s"));
indexOps.refresh();
}
}
Синхронизация PG/Mongo → ES
Главная архитектурная развилка при работе с ES в UCP-стеке. Четыре подхода:
Подход 1: Dual Write — простой и опасный
В одной транзакции пишем в БД и в ES:
@Transactional
public void save(Product product) {
productRepo.save(product); // PG
elasticsearch.save(toDoc(product)); // ES
}
Проблема: если PG-commit прошёл, а ES-запрос упал — данные разъехались. Восстановить тяжело, нужен periodic reconcile. Антипаттерн для production. Подходит только для прототипов.
Подход 2: Transactional Outbox
В транзакции БД пишем бизнес-данные и событие в таблицу outbox:
@Transactional
public void save(Product product) {
productRepo.save(product);
outboxRepo.save(new OutboxEvent(
UUID.randomUUID(),
"product.updated",
toJson(product)
));
}
Отдельный процесс читает outbox и индексирует в ES:
@Scheduled(fixedDelay = 1000)
@Transactional
public void publishToEs() {
var batch = outboxRepo.fetchUnpublished(500);
for (var event : batch) {
var doc = parseProductDoc(event.payload());
elasticsearch.save(doc);
outboxRepo.markPublished(event.id());
}
}
Плюсы: atomic с БД, retry бесплатно. Минусы: дополнительная инфраструктура, идемпотентность на стороне ES (но save по _id уже идемпотентен).
Detail про Outbox.
Подход 3: CDC через Debezium → Kafka → ES Sink Connector
Самый production-grade вариант:
PostgreSQL ──── Debezium ────→ Kafka ────→ Kafka Connect ES Sink ────→ Elasticsearch
WAL/logical
replication
- Debezium читает WAL/logical replication PostgreSQL (или oplog MongoDB), производит change events в Kafka.
- Elasticsearch Sink Connector читает topic и пишет в ES (insert/update/delete).
- Никакого кода в основном сервисе — он просто пишет в БД как обычно.
Плюсы: полная развязка, гарантия доставки, ловит все изменения (включая операционные правки руками в БД). Минусы: инфраструктура Debezium + Kafka Connect + monitoring отдельно, latency ~100-500ms.
Это стандартный подход для зрелых UCP-сервисов с серьёзным ES-индексом.
Подход 4: Полный re-index по расписанию
Когда данные меняются редко (каталоги, словари), можно периодически пересоздавать индекс с нуля:
@Scheduled(cron = "0 0 3 * * *", zone = "Europe/Moscow")
public void reindexCatalog() {
var newIndex = "products-v" + System.currentTimeMillis();
elasticsearch.indexOps(IndexCoordinates.of(newIndex)).create(/* settings, mapping */);
productRepo.findAll().forEach(product -> {
elasticsearch.save(toDoc(product), IndexCoordinates.of(newIndex));
});
// alias-переключение: read-pointer "products" теперь смотрит на новый индекс
elasticsearch.indexOps(IndexCoordinates.of("products")).alias(
new AliasActions().add(new AliasAction.Add(
AliasActionParameters.builderForAdd().withIndices(newIndex).withAliases("products").build()
))
);
}
Прост и надёжен для каталогов на ~миллион документов. Не подходит, когда нужна near-real-time индексация.
Aliases — переключение индексов
Никогда не указывайте клиенту реальное имя индекса. Используйте alias — указатель, который можно атомарно переключать:
PUT /products-v1
POST /_aliases
{
"actions": [
{ "add": { "index": "products-v1", "alias": "products" } }
]
}
# приложение читает по alias "products"
# когда надо передеплоить: создаём products-v2, заполняем, потом:
POST /_aliases
{
"actions": [
{ "remove": { "index": "products-v1", "alias": "products" } },
{ "add": { "index": "products-v2", "alias": "products" } }
]
}
# приложение незаметно для себя начинает читать новый индекс
Это даёт zero-downtime miграции схемы (mapping не позволяет изменить тип — переход через новый индекс + alias).
Ловушки
1. ES не транзакционен
elasticsearch.save(doc) коммитится сразу, не откатывается с @Transactional PostgreSQL. Любой подход синхронизации должен это учитывать.
2. Refresh interval по умолчанию = 1s
Документ, сохранённый секунду назад, может ещё не быть виден в поиске. Если в integration-тесте ищите только что сохранённый документ — нужен elasticsearch.indexOps(...).refresh() или ставьте refresh: WAIT_UNTIL:
elasticsearch.save(doc, RefreshPolicy.WAIT_UNTIL); // блокирует до refresh
В проде — дорого, не использовать. В тестах — норма.
3. Версия клиента vs версия кластера
elasticsearch-java-клиент 8.x работает с кластером 8.x (с некоторой обратной совместимостью). При апгрейде ES — проверяйте clien'та.
4. @Field(type = FieldType.Keyword) vs Long
category_id записан как Long — для term-запроса работает. Если записать как Keyword строкой — придётся менять mapping (через re-index). Лучше сразу типизировать корректно.
5. Большие документы
ES не любит документы >10 MB. Если нужно хранить большие тексты — поделить на чанки или вынести в S3 + ссылку.
Что почитать дальше
- Fundamentals — устройство индекса, понимание которого критично для написания клиентского кода.
- Query DSL и relevance — что генерирует Spring Data ES.
- Operations — ILM, snapshots, sizing.
- Распределённые паттерны — Outbox/CDC паттерны.
- Spring Data Elasticsearch reference — официальная документация.