Spring Boot 集成 Elasticsearch

Elasticsearch (简称ES) 是一个基于 Lucene 的分布式、高扩展、高实时的搜索与数据分析引擎。本章介绍 Spring Boot 应用集成 Elasticsearch ,通过 Spring 封装的API访问 Elasticsearch 。

Spring Data Elasticsearch 是 Spring Data 子项目,提供对 Elasticsearch 集成与访问的支持。需要注意的是 Spring Data Elasticsearch 版本 3.2.x 与 4.x 不兼容,在升级 Spring Boot 版本时要做好评估。本章是以 Spring Data Elasticsearch 4.x 为基础。

依赖

引入 Spring Data Elasticsearch

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

Spring Data Elasticsearch 3.x 只兼容到 Elasticsearch 6.x,Elasticsearch 7.x 需要使用Spring Data Elasticsearch 4.x 版本。这是由于 Spring Data Elasticsearch 依赖 ES 官方客户端,而 ES 在大版本之间提供的 Client 有部分不能兼容。所以在选择版本时最好和 ES 服务器版本一至。

如果使用 Rest Client 需要引入 spring-web ,否则会报 java.lang.ClassNotFoundException: org.springframework.http.HttpHeaders

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

配置

在 application.yml 加上

spring:
  elasticsearch:
    rest:
      connection-timeout: 1s # 连接超时时间
      username: # 连接用户名
      password: # 连接密码
      read-timeout: 30s # 读取超时时间
      uris: ${ES_HOST:localhost}:${ES_PORT:9200} # es rest 接口地址,多个用逗号隔开

创建 ElasticsearchRestTemplate

@Configuration
@EnableElasticsearchRepositories
public class ElasticsearchConfiguration extends AbstractElasticsearchConfiguration {

    @Value("${spring.elasticsearch.rest.uris}")
    private String uris;

    @Override
    @Bean
    public RestHighLevelClient elasticsearchClient() {

        final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
                .connectedTo(uris.split(","))
                .withConnectTimeout(Duration.ofSeconds(5))
                .withSocketTimeout(Duration.ofSeconds(30))
                .build();

        return RestClients.create(clientConfiguration).rest();
    }

    @Bean("esRestTemplate")
    public ElasticsearchRestTemplate esRestTemplate() {

        return new ElasticsearchRestTemplate(elasticsearchClient());
    }

}

@EnableElasticsearchRepositories注解可使用 JPA 访问 ES ,basePackages 为 Spring 扫描的包
该段代码配置 Elasticsearch Client ,创建 ElasticsearchRestTemplate 对象。

配置完可在任意地方注入使用 ElasticsearchRestTemplate

@Autowired
private ElasticsearchRestTemplate esRestTemplate;

使用

Spring Data Elasticsearch 对 ES 官方客户端API进行封装,访问 ES 更简单。

ElasticsearchTemplate
ElasticsearchOperations 使用 Transport Client 的接口的实现。从 4.0 版开始不推荐,请改用 ElasticsearchRestTemplate 。

ElasticsearchRestTemplate
ElasticsearchOperations 使用高级REST客户端的接口的实现。

ReactiveElasticsearchTemplate
响应式,默认的实现 ReactiveElasticsearchOperations 。

本章使用 ElasticsearchRestTemplate ,以前面代码配置为例,创建了 ElasticsearchRestTemplate 对象,下面例举一些用代码实现 KQL 语句功能。

  • 定义 LogEntity 类
package com.engrz.lab.springboot.elasticsearch;

import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import java.time.LocalDateTime;

/**
 * @author Engr-Z
 * @since 2021/2/5
 */
@Document(indexName = "demo-log")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class LogEntity {

    @Id
    private String id;

    /**
     * 日志等级
     */
    @Field(type = FieldType.Keyword)
    private String level;

    /**
     * 日志标题
     */
    @Field(type = FieldType.Text)
    private String title;

    /**
     * 日志内容
     */
    @Field(type = FieldType.Text)
    private String content;

    /**
     * 记录时间
     */
    @Field(type = FieldType.Date, format = DateFormat.date_time)
    private LocalDateTime recordTime;
}

@Document 注解用来声明对应 ES 中一个文档, indexName 是索引名
每个字段可用 @Field 注解声明类型

  • 插入
String id = UUID.randomUUID().toString().replaceAll("-", "");
LogEntity logEntity = new LogEntity(id, "info", "新增", "插入一条数据", LocalDateTime.now());
esRestTemplate.save(logEntity);
  • 修改
Document doc = Document.create();
doc.put("level", "warn");
doc.put("recordTime", LocalDateTime.now());
UpdateQuery.Builder builder = UpdateQuery.builder("6cba54cbc56a49cc8deee516e18b0121")
                                .withDocument(doc);
UpdateQuery updateQuery = builder.build();
esRestTemplate.update(updateQuery, IndexCoordinates.of("demo-log"))

指定修改文档id

  • 删除
esRestTemplate.delete("6cba54cbc56a49cc8deee516e18b0121", IndexCoordinates.of("demo-log"))
  • 查询

因为没有建立索引,以下查询条件字段加上 .keyword 关键字。关于 Elasticsearch KQL 相关的知识会单独讲解

  1. 需求描述:查询日志总数

KQL:

GET demo-log/_count

JAVA:

NativeSearchQueryBuilder searchQueryBuilder = new NativeSearchQueryBuilder();
long count = esRestTemplate.count(searchQueryBuilder.build(), IndexCoordinates.of("demo-log"));
log.info("count -> {}", count);
  1. 需求描述:查找3天以内,级别为 error 的日志,按记录时间倒序,分页,取前20条

KQL:

GET demo-log/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "level": "error"
          }
        },
        {
          "range": {
            "recordTime.keyword": {
              "gte": "2021-02-05T10:00:00",
              "format": "yyyy-MM-dd HH:mm:ss"
            }
          }
        }
      ]
    }
  },
  "sort": [
    {
      "recordTime.keyword": {
        "order": "desc"
      }
    }
  ],
  "size": 20
}

JAVA:

BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

// 查询日志级别为 error 的
boolQueryBuilder.must().add(new TermQueryBuilder("level.keyword", "error"));

// 查询3天内数的数据
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("recordTime.keyword");
rangeQueryBuilder.gte(LocalDateTime.now().minusDays(3));
boolQueryBuilder.must().add(rangeQueryBuilder);

// 分页查询20条
PageRequest pageRequest = PageRequest.of(0, 20, Sort.by("recordTime.keyword").descending());

NativeSearchQueryBuilder searchQueryBuilder = new NativeSearchQueryBuilder();
searchQueryBuilder.withQuery(boolQueryBuilder)
        .withPageable(pageRequest);

Query searchQuery = searchQueryBuilder.build();
SearchHits<LogEntity> hits = esRestTemplate.search(searchQuery, LogEntity.class);
List<SearchHit<LogEntity>> hitList = hits.getSearchHits();
log.info("hit size -> {}", hitList.size());
hitList.forEach(hit -> {
    log.info("返回数据:", hit.getContent().toString());
});

ES 查询默认最多返回10000条数据,超过10000条需要修改 ES 设置或使用滚动查询。 ES 分页方式会用新的章节讲解

  1. 需求描述:按日志级别分组,打印出每个级别的日志数

KQL:

GET demo-log/_search
{
  "aggs": {
    "termLevel": {
      "terms": {
        "field": "level.keyword"
      }
    }
  }
}

JAVA:

TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("aggTerms").field("level.keyword");

NativeSearchQueryBuilder searchQueryBuilder = new NativeSearchQueryBuilder();
searchQueryBuilder.addAggregation(termsAggregationBuilder);

Query searchQuery = searchQueryBuilder.build();
SearchHits<LogEntity> hits = esRestTemplate.search(searchQuery, LogEntity.class);
Terms aggTerms = hits.getAggregations().get("aggTerms");
for (Terms.Bucket bucket : aggTerms.getBuckets()) {
    log.info("level={}, count={}", bucket.getKey(), bucket.getDocCount());
}

AggregationBuilders 类可以创建其他类型的聚合查询,如 AggregationBuilders.sum 计数

整合JPA

我们也可以通过 Spring JPA 访问 ES ,使操作 ES 和访问数据库一样简单。使用 JPA 需要使用 @EnableElasticsearchRepositories 注解,前面在配置的时候有说明。

创建 LogRepository 接口,继承 ElasticsearchRepository


public interface LogRepository extends ElasticsearchRepository<LogEntity, String> {
}

Spring Boot 会自动扫描并装配,在代码中直接注入使用。

@Autowired
private LogRepository logRepository;

通过继承 ElasticsearchRepository 接口, LogRepository 拥有常用的方法。

  • 插入
String id = UUID.randomUUID().toString().replaceAll("-", "");
LogEntity logEntity = new LogEntity(id, "info", "JPA新增", "JPA插入一条数据", LocalDateTime.now());
logRepository.save(logEntity);
  • 修改

未提供。更新数据要重建索引, ES 中不建议大量更新数据。

  • 删除
logRepository.deleteById("cd5c2ac848724acf8d66522e743eed87");
  • 计数
long count = logRepository.count();
log.info("count -> {}", count);
  • 查询

根据id查找:

Optional<LogEntity> entity = logRepository.findById("b64446b58492454c8576504b68767332");
log.info("size -> {}", entity.orElse(null));
  • 自定义条件

Spring JPA 支持把条件写在方法名中,无需手动编写实现方法,框架自动按规则解析。

  1. 需求描述:分页查询指定日志级别数据
List<LogEntity> findByLevel(String level, Pageable pageable);

在 Repository 中的方法如果有 Pageable 参数, Spring 会自动使用 pageable 中的参数信息实现分页和排序。
方法名中支持 and、or、like等查询,也支持lt、gt、lte、gte、eq等比较。可参考官方列表Query creation

KQL 查询

虽然 Spring JPA 支持多个条件查询,但如果条件比较多,方法名会很长。可以使用 @Query 注解传入 KQL 查询

  1. 需求描述:分页查询指定日志级别数据,并支持时间段查询
@Query("{\n" +
        "    \"bool\": {\n" +
        "      \"must\": [\n" +
        "        {\n" +
        "          \"term\": {\n" +
        "            \"level\": ?0\n" +
        "          }\n" +
        "        },\n" +
        "        {\n" +
        "          \"range\": {\n" +
        "            \"recordTime.keyword\": {\n" +
        "              \"gte\": ?1,\n" +
        "              \"lte\": ?2\n" +
        "            }\n" +
        "          }\n" +
        "        }\n" +
        "      ]\n" +
        "    }\n" +
        "  }")
List<LogEntity> exampleQuery(String level, LocalDateTime startDateTime, LocalDateTime endDateTime, Pageable pageable);

一般场景推荐JPA,复杂的查询还是需要ElasticsearchRestTemplate。两者结合使用会更高效简洁。


已发布

分类

来自

评论

《 “Spring Boot 集成 Elasticsearch” 》 有 3 条评论

  1. 纳兰

    @Field(type = FieldType.Date, format = DateFormat.date_time)
    在使用Repository.save时会报错,在使用UpdateRequest时也会报错

  2. 纳兰

    @Field(type = FieldType.Date, format = DateFormat.date_time)
    在使用Repository.save会报错
    在使用UpdateRequest时也会报错

    1. 检查elasticsearch版本,如果有建立索引,检查索引中字段类型

回复 wangzhengzhen 取消回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注