Elasticsearch (简称ES) 是一个基于 Lucene 的分布式、高扩展、高实时的搜索与数据分析引擎。本章介绍 Spring Boot 应用集成 Elasticsearch ,通过 Spring 封装的API访问 Elasticsearch 。
Spring Data Elasticsearch 是 Spring Data 子项目,提供对 Elasticsearch 集成与访问的支持。需要注意的是 Spring Data Elasticsearch 版本 3.2.x 与 4.x 不兼容,在升级 Spring Boot 版本时要做好评估。本章是以 Spring Data Elasticsearch 4.x 为基础。
依赖
引入 Spring Data Elasticsearch
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
Spring Data Elasticsearch 3.x 只兼容到 Elasticsearch 6.x,Elasticsearch 7.x 需要使用Spring Data Elasticsearch 4.x 版本。这是由于 Spring Data Elasticsearch 依赖 ES 官方客户端,而 ES 在大版本之间提供的 Client 有部分不能兼容。所以在选择版本时最好和 ES 服务器版本一至。
如果使用 Rest Client 需要引入 spring-web ,否则会报 java.lang.ClassNotFoundException: org.springframework.http.HttpHeaders
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
配置
在 application.yml 加上
spring:
elasticsearch:
rest:
connection-timeout: 1s # 连接超时时间
username: # 连接用户名
password: # 连接密码
read-timeout: 30s # 读取超时时间
uris: ${ES_HOST:localhost}:${ES_PORT:9200} # es rest 接口地址,多个用逗号隔开
创建 ElasticsearchRestTemplate
@Configuration
@EnableElasticsearchRepositories
public class ElasticsearchConfiguration extends AbstractElasticsearchConfiguration {
@Value("${spring.elasticsearch.rest.uris}")
private String uris;
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(uris.split(","))
.withConnectTimeout(Duration.ofSeconds(5))
.withSocketTimeout(Duration.ofSeconds(30))
.build();
return RestClients.create(clientConfiguration).rest();
}
@Bean("esRestTemplate")
public ElasticsearchRestTemplate esRestTemplate() {
return new ElasticsearchRestTemplate(elasticsearchClient());
}
}
@EnableElasticsearchRepositories
注解可使用 JPA 访问 ES ,basePackages 为 Spring 扫描的包
该段代码配置 Elasticsearch Client ,创建 ElasticsearchRestTemplate 对象。
配置完可在任意地方注入使用 ElasticsearchRestTemplate
@Autowired
private ElasticsearchRestTemplate esRestTemplate;
使用
Spring Data Elasticsearch 对 ES 官方客户端API进行封装,访问 ES 更简单。
ElasticsearchTemplate
ElasticsearchOperations 使用 Transport Client 的接口的实现。从 4.0 版开始不推荐,请改用 ElasticsearchRestTemplate 。
ElasticsearchRestTemplate
ElasticsearchOperations 使用高级REST客户端的接口的实现。
ReactiveElasticsearchTemplate
响应式,默认的实现 ReactiveElasticsearchOperations 。
本章使用 ElasticsearchRestTemplate ,以前面代码配置为例,创建了 ElasticsearchRestTemplate 对象,下面例举一些用代码实现 KQL 语句功能。
- 定义 LogEntity 类
package com.engrz.lab.springboot.elasticsearch;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.time.LocalDateTime;
/**
* @author Engr-Z
* @since 2021/2/5
*/
@Document(indexName = "demo-log")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class LogEntity {
@Id
private String id;
/**
* 日志等级
*/
@Field(type = FieldType.Keyword)
private String level;
/**
* 日志标题
*/
@Field(type = FieldType.Text)
private String title;
/**
* 日志内容
*/
@Field(type = FieldType.Text)
private String content;
/**
* 记录时间
*/
@Field(type = FieldType.Date, format = DateFormat.date_time)
private LocalDateTime recordTime;
}
@Document
注解用来声明对应 ES 中一个文档, indexName 是索引名
每个字段可用@Field
注解声明类型
- 插入
String id = UUID.randomUUID().toString().replaceAll("-", "");
LogEntity logEntity = new LogEntity(id, "info", "新增", "插入一条数据", LocalDateTime.now());
esRestTemplate.save(logEntity);
- 修改
Document doc = Document.create();
doc.put("level", "warn");
doc.put("recordTime", LocalDateTime.now());
UpdateQuery.Builder builder = UpdateQuery.builder("6cba54cbc56a49cc8deee516e18b0121")
.withDocument(doc);
UpdateQuery updateQuery = builder.build();
esRestTemplate.update(updateQuery, IndexCoordinates.of("demo-log"))
指定修改文档id
- 删除
esRestTemplate.delete("6cba54cbc56a49cc8deee516e18b0121", IndexCoordinates.of("demo-log"))
- 查询
因为没有建立索引,以下查询条件字段加上 .keyword 关键字。关于 Elasticsearch KQL 相关的知识会单独讲解
- 需求描述:查询日志总数
KQL:
GET demo-log/_count
JAVA:
NativeSearchQueryBuilder searchQueryBuilder = new NativeSearchQueryBuilder();
long count = esRestTemplate.count(searchQueryBuilder.build(), IndexCoordinates.of("demo-log"));
log.info("count -> {}", count);
- 需求描述:查找3天以内,级别为 error 的日志,按记录时间倒序,分页,取前20条
KQL:
GET demo-log/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"level": "error"
}
},
{
"range": {
"recordTime.keyword": {
"gte": "2021-02-05T10:00:00",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
]
}
},
"sort": [
{
"recordTime.keyword": {
"order": "desc"
}
}
],
"size": 20
}
JAVA:
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
// 查询日志级别为 error 的
boolQueryBuilder.must().add(new TermQueryBuilder("level.keyword", "error"));
// 查询3天内数的数据
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("recordTime.keyword");
rangeQueryBuilder.gte(LocalDateTime.now().minusDays(3));
boolQueryBuilder.must().add(rangeQueryBuilder);
// 分页查询20条
PageRequest pageRequest = PageRequest.of(0, 20, Sort.by("recordTime.keyword").descending());
NativeSearchQueryBuilder searchQueryBuilder = new NativeSearchQueryBuilder();
searchQueryBuilder.withQuery(boolQueryBuilder)
.withPageable(pageRequest);
Query searchQuery = searchQueryBuilder.build();
SearchHits<LogEntity> hits = esRestTemplate.search(searchQuery, LogEntity.class);
List<SearchHit<LogEntity>> hitList = hits.getSearchHits();
log.info("hit size -> {}", hitList.size());
hitList.forEach(hit -> {
log.info("返回数据:", hit.getContent().toString());
});
ES 查询默认最多返回10000条数据,超过10000条需要修改 ES 设置或使用滚动查询。 ES 分页方式会用新的章节讲解
- 需求描述:按日志级别分组,打印出每个级别的日志数
KQL:
GET demo-log/_search
{
"aggs": {
"termLevel": {
"terms": {
"field": "level.keyword"
}
}
}
}
JAVA:
TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("aggTerms").field("level.keyword");
NativeSearchQueryBuilder searchQueryBuilder = new NativeSearchQueryBuilder();
searchQueryBuilder.addAggregation(termsAggregationBuilder);
Query searchQuery = searchQueryBuilder.build();
SearchHits<LogEntity> hits = esRestTemplate.search(searchQuery, LogEntity.class);
Terms aggTerms = hits.getAggregations().get("aggTerms");
for (Terms.Bucket bucket : aggTerms.getBuckets()) {
log.info("level={}, count={}", bucket.getKey(), bucket.getDocCount());
}
AggregationBuilders 类可以创建其他类型的聚合查询,如 AggregationBuilders.sum 计数
整合JPA
我们也可以通过 Spring JPA 访问 ES ,使操作 ES 和访问数据库一样简单。使用 JPA 需要使用 @EnableElasticsearchRepositories
注解,前面在配置的时候有说明。
创建 LogRepository 接口,继承 ElasticsearchRepository
public interface LogRepository extends ElasticsearchRepository<LogEntity, String> {
}
Spring Boot 会自动扫描并装配,在代码中直接注入使用。
@Autowired
private LogRepository logRepository;
通过继承 ElasticsearchRepository 接口, LogRepository 拥有常用的方法。
- 插入
String id = UUID.randomUUID().toString().replaceAll("-", "");
LogEntity logEntity = new LogEntity(id, "info", "JPA新增", "JPA插入一条数据", LocalDateTime.now());
logRepository.save(logEntity);
- 修改
未提供。更新数据要重建索引, ES 中不建议大量更新数据。
- 删除
logRepository.deleteById("cd5c2ac848724acf8d66522e743eed87");
- 计数
long count = logRepository.count();
log.info("count -> {}", count);
- 查询
根据id查找:
Optional<LogEntity> entity = logRepository.findById("b64446b58492454c8576504b68767332");
log.info("size -> {}", entity.orElse(null));
- 自定义条件
Spring JPA 支持把条件写在方法名中,无需手动编写实现方法,框架自动按规则解析。
- 需求描述:分页查询指定日志级别数据
List<LogEntity> findByLevel(String level, Pageable pageable);
在 Repository 中的方法如果有 Pageable 参数, Spring 会自动使用 pageable 中的参数信息实现分页和排序。
方法名中支持 and、or、like等查询,也支持lt、gt、lte、gte、eq等比较。可参考官方列表Query creation
KQL 查询
虽然 Spring JPA 支持多个条件查询,但如果条件比较多,方法名会很长。可以使用 @Query
注解传入 KQL 查询
- 需求描述:分页查询指定日志级别数据,并支持时间段查询
@Query("{\n" +
" \"bool\": {\n" +
" \"must\": [\n" +
" {\n" +
" \"term\": {\n" +
" \"level\": ?0\n" +
" }\n" +
" },\n" +
" {\n" +
" \"range\": {\n" +
" \"recordTime.keyword\": {\n" +
" \"gte\": ?1,\n" +
" \"lte\": ?2\n" +
" }\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
" }")
List<LogEntity> exampleQuery(String level, LocalDateTime startDateTime, LocalDateTime endDateTime, Pageable pageable);
一般场景推荐JPA,复杂的查询还是需要ElasticsearchRestTemplate。两者结合使用会更高效简洁。
除非注明,否则均为”攻城狮-正“原创文章,请注明出处。
发表回复