SpringBoot集成Elasticsearch 实战(1)
1. 目录
202012301144402. SpringBoot集成
开发工具,这里选择的是IDEA 2021.1.2,构建 Gradle
工程等一堆通用操作,不清楚的自行百度 或者 参看 90分钟玩转Gradle
2.1. 依赖配置
我这边选择 spring-boot-starter-data-elasticsearch 方式来集成 spring-boot
中集成的版本号与实际安装版本号的差异,尽量选择一致的版本,否则在集成过程中,会有莫名的问题。读者在选择的时候多加留意。
api("org.springframework.boot:spring-boot-starter-data-elasticsearch")
我在此基础上封装一层 persistence-elasticsearch
,更贴近一般项目使用。
- 中央仓库下载
- 阿里云的仓库下载
2.2. 核心操作类
为了规范索引管理,这里将所有的操作都封装成一个基类,实现对索引的增删改查。同时还集成了对数据的单个以及批量的插入以及删除。避免针对每个索引都自己写一套实现,杜绝代码的冗余,同时这样的集成对代码的结构本身也是低侵入性。
- AbstractElasticIndexManger
public abstract class AbstractElasticIndexManger {
protected ElasticsearchRestTemplate elasticsearchRestTemplate;
protected RestHighLevelClient restHighLevelClient;
@Autowired
public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}
@Autowired
public void setElasticsearchRestTemplate(ElasticsearchRestTemplate elasticsearchRestTemplate) {
this.elasticsearchRestTemplate = elasticsearchRestTemplate;
}
/**
* 设置分片 和 副本
* 副本作用主要为了保证数据安全
*
* @param request 请求
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 19:27
*/
protected void buildSetting(CreateIndexRequest request, int replicas, int shards) {
request.settings(Settings.builder().put("index.number_of_shards", shards)
.put("index.number_of_replicas", replicas));
}
/**
* 查询匹配条件的数据量,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
*
* @param builder BoolQueryBuilder类型查询实例
* @param indexNames 索引名,可以一次性查询多个
* @return long 最终数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-9:26
**/
protected long count(BoolQueryBuilder builder, String... indexNames) {
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
nativeSearchQueryBuilder.withQuery(builder);
return elasticsearchRestTemplate.count(nativeSearchQueryBuilder.build(), IndexCoordinates.of(indexNames));
}
/**
* 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
*
* @param builder BoolQueryBuilder类型查询实例
* @param clazz Class对象
* @param indexNames 索引名,可以一次性查询多个
* @return long 最终数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-9:26
**/
protected SearchHits search(BoolQueryBuilder builder, Class<? extends BasePo> clazz, String... indexNames) {
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
nativeSearchQueryBuilder.withQuery(builder);
Pageable pageable = PageRequest.of(1, 20);
nativeSearchQueryBuilder.withPageable(pageable);
return elasticsearchRestTemplate.search(nativeSearchQueryBuilder.build(), clazz, IndexCoordinates.of(indexNames));
}
/**
* 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
*
* @param page 当前页
* @param size 每页大小
* @param builder BoolQueryBuilder类型查询实例
* @param clazz Class对象
* @param indexNames 索引名,可以一次性查询多个
* @return SearchHits 命中结果的数据集
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-9:26
**/
protected SearchHits<? extends BasePo> searchPage(int page, int size, BoolQueryBuilder builder, Class<? extends BasePo> clazz, String... indexNames) {
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
nativeSearchQueryBuilder.withQuery(builder);
Pageable pageable = PageRequest.of(page, size);
nativeSearchQueryBuilder.withPageable(pageable);
return elasticsearchRestTemplate.search(nativeSearchQueryBuilder.build(), clazz, IndexCoordinates.of(indexNames));
}
protected DeleteByQueryRequest builderDeleteRequest(QueryBuilder builder, String... indexNames) {
DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames);
request.setQuery(builder);
request.setBatchSize(0X5F5E0FF);
request.setConflicts("proceed");
return request;
}
/**
* 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
*
* @param params Map形式的 字段名 和 字段内容 组成的条件
* @param builder BoolQueryBuilder类型查询实例
* @param indexNames 索引名,可以一次性查询多个
* @return long 最终数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-9:26
**/
protected BulkByScrollResponse update(Map<String, Object> params, BoolQueryBuilder builder, String... indexNames) {
UpdateByQueryRequest request = buildUpdateByQueryReq(params, builder, indexNames);
try {
return restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 构建更新 QueryRequest
*
* @param params 参数
* @param builder 布尔构建
* @param indexNames 索引
* @return UpdateByQueryRequest
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/28-15:50
**/
protected UpdateByQueryRequest buildUpdateByQueryReq(Map<String, Object> params, BoolQueryBuilder builder, String... indexNames) {
Script script = buildScriptType(params);
UpdateByQueryRequest request = new UpdateByQueryRequest(indexNames);
request.setQuery(builder);
request.setScript(script);
request.setConflicts("proceed");
request.setRefresh(true);
request.setTimeout(TimeValue.timeValueMinutes(3));
return request;
}
/**
* 以 K-V键值对 方式构建条件 Script
*
* @param params Map形式的 字段名 和 字段内容 组成的条件
* @return Script
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/28-13:19
**/
protected Script buildScriptType(Map<String, Object> params) {
Set<String> keys = params.keySet();
StringBuffer idOrCodeStb = new StringBuffer();
for (String key : keys) {
idOrCodeStb.append("ctx._source.").append(key).append("=params.").append(key).append(";");
}
ScriptType type = ScriptType.INLINE;
return new Script(type, Script.DEFAULT_SCRIPT_LANG, idOrCodeStb.toString(), params);
}
/**
* @param builder BoolQueryBuilder
* @param bool 布尔类条件
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/28-14:45
**/
protected void setBuilders(BoolQueryBuilder builder, BoolCondition bool) {
mustBuilders(builder, bool);
mustNotBuilders(builder, bool);
shouldBuilders(builder, bool);
filterBuilders(builder, bool);
}
/**
* 构建满足 必须 条件 的方法
*
* @param builder BoolQueryBuilder
* @param bool 布尔类条件
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/28-14:45
**/
protected void mustBuilders(BoolQueryBuilder builder, BoolCondition bool) {
List<AtomicCondition> must = bool.getMust();
if (must.isEmpty()) {
return;
}
for (AtomicCondition cds : must) {
builder.must(getQueryBuilder(cds));
}
}
/**
* 构建满足 非必须 条件 的方法
*
* @param builder BoolQueryBuilder
* @param bool 布尔类条件
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/28-14:45
**/
protected void mustNotBuilders(BoolQueryBuilder builder, BoolCondition bool) {
List<AtomicCondition> mustNot = bool.getMustNot();
if (mustNot.isEmpty()) {
return;
}
for (AtomicCondition cds : mustNot) {
builder.mustNot(getQueryBuilder(cds));
}
}
/**
* 构建满足 可选 条件 的方法
*
* @param builder BoolQueryBuilder
* @param bool 布尔类条件
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/28-14:45
**/
protected void shouldBuilders(BoolQueryBuilder builder, BoolCondition bool) {
List<AtomicCondition> should = bool.getShould();
if (should.isEmpty()) {
return;
}
for (AtomicCondition cds : should) {
builder.should(getQueryBuilder(cds));
}
}
/**
* 构建满足 必须 条件 的方法,推荐使用
*
* @param builder BoolQueryBuilder
* @param bool 布尔类条件
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/28-14:45
**/
protected void filterBuilders(BoolQueryBuilder builder, BoolCondition bool) {
List<AtomicCondition> filter = bool.getFilter();
if (filter.isEmpty()) {
return;
}
for (AtomicCondition cds : filter) {
builder.filter(getQueryBuilder(cds));
}
}
public QueryBuilder getQueryBuilder(AtomicCondition cds) {
QueryBuilder queryBuilder;
Tuple tuple = cds.getTuple();
switch (cds.getStatus()) {
case (Constants.SUFFIX_QUERY):
queryBuilder = QueryBuilders.wildcardQuery(cds.getField(), Constants.MULTI_CHARACTER + tuple.v1());
break;
case (Constants.SUFFIX_SINGLE_QUERY):
queryBuilder = QueryBuilders.wildcardQuery(cds.getField(), Constants.SINGLE_CHARACTER + tuple.v1());
break;
case (Constants.RANGE_QUERY):
queryBuilder = QueryBuilders.rangeQuery(cds.getField()).from(tuple.v1()).to(tuple.v2());
break;
case (Constants.PREFIX_QUERY):
queryBuilder = QueryBuilders.prefixQuery(cds.getField(), tuple.v1().toString());
break;
case (Constants.REG_QUERY):
queryBuilder = QueryBuilders.regexpQuery(cds.getField(), tuple.v1().toString());
break;
default:
queryBuilder = QueryBuilders.termQuery(cds.getField(), tuple.v1().toString());
break;
}
return queryBuilder;
}
}
- ElasticIndexManger
public class ElasticIndexManger extends AbstractElasticIndexManger {
/**
* 创建索引,默认分片数量为 1,即一个主片,副本数量为 0
*
* @param indexName 索引名称
* @param mapping 索引定义,JSON形式的字符串
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:30
*/
public void createIndex(String indexName, String mapping) {
createIndex(indexName, mapping, 0, 1);
}
/**
* 指定索引结构创建索引
*
* @param indexName 索引名称
* @param mapping 索引定义,JSON形式的字符串
* @param replicas 副本的数量
* @param shards 分片数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:30
*/
public void createIndex(String indexName, String mapping, int replicas, int shards) {
try {
if (!this.existIndex(indexName)) {
log.error(" indexName={} 已经存在,mapping={}", indexName, mapping);
return;
}
CreateIndexRequest request = new CreateIndexRequest(indexName);
buildSetting(request, replicas, shards);
request.mapping(mapping, XContentType.JSON);
CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
if (!res.isAcknowledged()) {
throw new RuntimeException("初始化失败");
}
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
}
/**
* 获取所有索引,默认为所有索引
*
* @return List
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/10/30-11:54
**/
public List getAllIndex() {
return getAllIndex(Constants.MULTI_CHARACTER);
}
/**
* 获取所有索引,按照正则表达式方式过滤 索引名称,并返回符合条件的索引名字
*
* @param inPattern 正则表达式
* @return List
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/10/30-11:54
**/
public List<String> getAllIndex(String inPattern) {
GetIndexRequest getIndexRequest = new GetIndexRequest(inPattern);
try {
GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
String[] indices = getIndexResponse.getIndices();
return Arrays.asList(indices);
} catch (IOException e) {
log.error("获取索引失败 {} 已经存在", e.getMessage());
} catch (ElasticsearchStatusException e) {
log.error("获取索引失败 {} 索引本身不存在", e.getMessage());
}
return Collections.EMPTY_LIST;
}
/**
* 制定配置项的判断索引是否存在,注意与 isExistsIndex 区别
* <ul>
* <li>1、可以指定 用本地检索 还是用 主动节点方式检索</li>
* <li>2、是否适应被人读的方式</li>
* <li>3、返回默认设置</li>
* </ul>
*
* @param indexName index名
* @return boolean
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:27
*/
public boolean existIndex(String indexName) throws IOException {
GetIndexRequest request = new GetIndexRequest(indexName);
//TRUE-返回本地信息检索状态,FALSE-还是从主节点检索状态
request.local(false);
//是否适应被人可读的格式返回
request.humanReadable(true);
//是否为每个索引返回所有默认设置
request.includeDefaults(false);
//控制如何解决不可用的索引以及如何扩展通配符表达式,忽略不可用索引的索引选项,仅将通配符扩展为开放索引,并且不允许从通配符表达式解析任何索引
request.indicesOptions(IndicesOptions.lenientExpandOpen());
return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
}
/**
* 单纯断某个索引是否存在
*
* @param indexName index名
* @return boolean 存在为True,不存在则 False
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:27
*/
public boolean isIndexExists(String indexName) throws Exception {
return restHighLevelClient.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
}
/**
* 批量插入数据,通过 {@link List} 的对象集合进行插入,此处对失败的提交进行二次提交,并覆盖原有数据,这一层面是 ElasticSearch自行控制
*
* @param indexName index
* @param list 列表
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:26
*/
public void batch(String indexName, List<? extends BasePo> list) throws IOException {
int sleep = 15;
BulkRequest request = new BulkRequest();
list.forEach(item -> request.add(new IndexRequest(indexName)
.id(item.getId().toString())
.source(JSON.toJSONString(item), XContentType.JSON)));
try {
BulkResponse bulkResponse = bulk(request);
log.error("[Verification BulkResponse bulk 操作结果] {}, 文件大小 {} ", bulkResponse.status(), list.size());
if (bulkResponse.hasFailures()) {
log.error(bulkResponse.buildFailureMessage());
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
log.error(bulkItemResponse.getFailureMessage());
}
}
log.error("批量操作失败,重新再提交一次,间隔时间{}, 文件大小 {} ", sleep, list.size());
TimeUnit.SECONDS.sleep(sleep);
bulkResponse = bulk(request);
if (bulkResponse.hasFailures()) {
log.error("再次提交失败,需要写入MQ , 文件大小 {} ", list.size());
}
}
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
/**
* bulk 方式批量提交
*
* @param request {@link BulkRequest} 请求
* @return BulkResponse
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/10/24-15:50
**/
private BulkResponse bulk(BulkRequest request) throws IOException {
return restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
}
/**
* <p>
* 批量插入数据,通过 {@link List} 的对象集合进行插入,提交前,判断 该索引是否存在不存在则直接创建 该索引
* 并对失败的提交进行二次提交,并覆盖原有数据,这一层面是 ElasticSearch自行控制
* </p>
*
* @param indexName index
* @param list 列表
* @param created 当索引不存在,则创建索引,默认为 true,即索引不存在,创建该索引,此时 mapping 应该不为空
* @param mapping 索引定义,JSON形式的字符串
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:26
*/
public void batch(List<? extends BasePo> list, String indexName, boolean created, String mapping) throws Exception {
try {
if (!isIndexExists(indexName)) {
log.error("[Index does not exist] Rebuilding index. IndexName ={}", indexName);
if (created && StringUtils.isNotBlank(mapping)) {
createIndex(indexName, mapping);
} else {
log.error("[Index does not exist , No index creation] IndexName ={}", indexName);
return;
}
}
batch(indexName, list);
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
/**
* 批量删除,根据索引名称,删除索引下数据
*
* @param indexName index
* @param idList 待删除列表
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:14
*/
public <T> void deleteBatch(String indexName, Collection<T> idList) {
BulkRequest request = new BulkRequest();
idList.forEach(item -> request.add(new DeleteRequest(indexName, item.toString())));
try {
restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 根据索引名称,和 {@link SearchSourceBuilder} 条件,以及返回对象实体类,返回列表
*
* @param indexName index
* @param builder 查询参数
* @param clazz 结果类对象
* @return java.util.List<T>
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:14
*/
public <T> List<T> search(String indexName, SearchSourceBuilder builder, Class<T> clazz) {
List res = Collections.EMPTY_LIST;
try {
SearchRequest request = new SearchRequest(indexName);
request.source(builder);
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
res = new ArrayList<>(hits.length);
for (SearchHit hit : hits) {
res.add(JSON.parseObject(hit.getSourceAsString(), clazz));
}
} catch (IOException e) {
log.error("[ElasticSearch] connect err ,err-msg {}", e.getMessage());
} catch (Exception e) {
throw new RuntimeException(e);
}
return res;
}
/**
* 删除 index,以及索引下数据
*
* @param indexName 索引名字
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:13
*/
public void deleteIndex(String indexName) {
try {
restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 删除索引下数据,但是不删除索引结构
*
* @param builder 条件构建模式
* @param indexNames 索引名称列表
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:13
*/
public void deleteByQuery(QueryBuilder builder, String... indexNames) {
try {
DeleteByQueryRequest request = builderDeleteRequest(builder, indexNames);
BulkByScrollResponse response = restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 不推荐使用,原因为不够灵活,获取该索引下可以匹配的数量,支持 模糊查询和精确查询,
* 用法 在 方法 <b>field</b> 的处理上。
* <ul>
* <li>模糊匹配模式:字段</li>
* <li>精确匹配模式:字段.类型</li>
* </ul>
*
* @param indexName 文档索引名
* @param field 字段
* @param text 内容
* @return long 数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2018/07/20-20:47
**/
@Deprecated
public long countMatchPhrasePrefixQuery(String indexName, String field, String text) {
CountRequest countRequest = new CountRequest(indexName);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(matchPhrasePrefixQuery(field, text));
countRequest.source(searchSourceBuilder);
CountResponse countResponse = null;
try {
countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return countResponse == null ? 0L : countResponse.getCount();
}
/**
* 按照字段 内容进行精确匹配,返回匹配的数量
*
* @param field 字段名
* @param content 内容
* @param indexNames 索引名
* @return long 数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-10:49
**/
public long exactCondition(String field, String content, String... indexNames) {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must(QueryBuilders.termQuery(field, content));
return count(builder, indexNames);
}
/**
* 按照字段的前缀内容进行匹配,返回匹配的数量
*
* @param field 字段名
* @param prefix 前缀
* @param indexNames 索引名
* @return long 数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-10:49
**/
public long prefix(String field, String prefix, String... indexNames) {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must(QueryBuilders.prefixQuery(field, prefix));
return count(builder, indexNames);
}
/**
* 按照字段对 内容进行后缀匹配,返回匹配的数量
*
* @param suffix 后缀
* @param indexNames 索引名
* @return long 数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-10:56
**/
public long suffix(String field, String suffix, String... indexNames) {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
return count(builder, indexNames);
}
/**
* 字段的前缀和后缀都必须满足条件
*
* @param field 字段
* @param prefix 前缀
* @param suffix 后缀
* @param indexNames 索引名
* @return long 数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-10:59
**/
public long prefixAndSuffix(String field, String prefix, String suffix, String... indexNames) {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must(QueryBuilders.prefixQuery(field, prefix));
builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
return count(builder, indexNames);
}
/**
* 字段的前缀和后缀都满足一个条件按即可
*
* @param field 字段
* @param prefix 前缀
* @param suffix 后缀
* @param indexNames 索引名
* @return long 数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-10:59
**/
public long prefixOrSuffix(String field, String prefix, String suffix, String... indexNames) {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.should(QueryBuilders.prefixQuery(field, prefix));
builder.should(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
return count(builder, indexNames);
}
/**
* 字段的前缀必须满足,而 后缀则不要求 不一定满足
*
* @param field 字段
* @param prefix 前缀
* @param suffix 后缀
* @param indexNames 索引名
* @return long 数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-10:59
**/
public long prefixMustSuffixShould(String field, String prefix, String suffix, String... indexNames) {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must(QueryBuilders.prefixQuery(field, prefix));
builder.should(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
return count(builder, indexNames);
}
/**
* 字段的前缀选择性满足,而 后缀则一定要满足
*
* @param field 字段
* @param prefix 前缀
* @param suffix 后缀
* @param indexNames 索引名
* @return long 数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-10:59
**/
public long prefixShouldSuffixMust(String field, String prefix, String suffix, String... indexNames) {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.should(QueryBuilders.prefixQuery(field, prefix));
builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
return count(builder, indexNames);
}
/**
* 查询总数
*
* @param indexNames 索引文档名称,可以是多个
* @return long 匹配的数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/10/29-21:11
**/
public long total(String... indexNames) {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
return count(builder, indexNames);
}
/**
* 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
*
* @param params Map形式的 字段名 和 字段内容 组成的条件
* @param bool 复合条件封装
* @param indexNames 索引名,可以一次性查询多个
* @return long 最终数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-9:26
**/
public BulkByScrollResponse update(Map<String, Object> params, BoolCondition bool, String... indexNames) {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
setBuilders(builder,bool);
return update(params,builder,indexNames);
}
/**
* 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
*
* @param page 当前页
* @param size 每页大小
* @param clazz Class对象
* @param indexNames 索引名,可以一次性查询多个
* @return SearchHits 命中结果的数据集
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/11/1-9:26
**/
protected SearchHits<? extends BasePo> searchPage(int page, int size, BoolCondition bool,Class<? extends BasePo> clazz, String... indexNames) {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
setBuilders(builder,bool);
return searchPage(page,size,builder, clazz, indexNames);
}
}
3. 项目代码
通过以上的集成,我们看到完成在项目中对 elasticsearch
的集成,同时也用基类,将所有可能的操作都封装起来。下来我们通过对基类的讲解,来逐个说明!
3.1. 索引管理
由于在ElasticIndexManger类定义了所有方法,直接调用即可。
3.1.1. 创建索引
我们在创建索引过程中需要先判断是否有这个索引,否则不允许创建,由于我案例采用的是手动指定 indexName
和 Settings
,大家看的过程中要特别注意下,而且还有一点 indexName
必须是小写,如果是大写在创建过程中会有错误
。详细的代码实现见如下:
/**
* 创建索引,默认分片数量为 1,即一个主片,副本数量为 0
*
* @param indexName 索引名称
* @param mapping 索引定义,JSON形式的字符串
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:30
*/
public void createIndex(String indexName, String mapping) {
createIndex(indexName, mapping, 0, 1);
}
/**
* 指定索引结构创建索引
*
* @param indexName 索引名称
* @param mapping 索引定义,JSON形式的字符串
* @param replicas 副本的数量
* @param shards 分片数量
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:30
*/
public void createIndex(String indexName, String mapping, int replicas, int shards) {
try {
if (!this.existIndex(indexName)) {
log.error(" indexName={} 已经存在,mapping={}", indexName, mapping);
return;
}
CreateIndexRequest request = new CreateIndexRequest(indexName);
buildSetting(request, replicas, shards);
request.mapping(mapping, XContentType.JSON);
CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
if (!res.isAcknowledged()) {
throw new RuntimeException("初始化失败");
}
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
}
创建索引需要设置分片,这里采用Settings.Builder方式,当然也可以 JSON
自定义方式,本文篇幅有限,不做演示。
index.number_of_shards:分片数
number_of_replicas:副本数
/**
* 设置分片 和 副本
* 副本作用主要为了保证数据安全
*
* @param request 请求
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 19:27
*/
protected void buildSetting(CreateIndexRequest request, int replicas, int shards) {
request.settings(Settings.builder().put("index.number_of_shards", shards)
.put("index.number_of_replicas", replicas));
}
[elastic@localhost elastic]$ curl -H "Content-Type: application/json" -X GET "http://localhost:9200/_cat/indices?v"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open twitter scSSD1SfRCio4F77Hh8aqQ 3 2 2 0 8.3kb 8.3kb
yellow open idx_location _BJ_pOv0SkS4tv-EC3xDig 3 2 1 0 4kb 4kb
yellow open wongs uT13XiyjSW-VOS3GCqao8w 3 2 1 0 3.4kb 3.4kb
yellow open idx_locat Kr3wGU7JT_OUrRJkyFSGDw 3 2 3 0 13.2kb 13.2kb
yellow open idx_copy_to HouC9s6LSjiwrJtDicgY3Q 3 2 1 0 4kb 4kb
说明创建成功,这里总是通过命令行来验证,有点繁琐,既然我都有WEB服务,为什么不直接通过HTTP验证了?
3.1.2. 查看索引
查看索引这个操作支持模糊操作,即以通配符 *
作为一个或者多个字符匹配,这个操作在实际应用非常好用,将来有机会说到 Index
设计过程中就显得尤为重要。
/**
* 获取所有索引,默认为所有索引
*
* @return List
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/10/30-11:54
**/
public List getAllIndex() {
return getAllIndex(Constants.MULTI_CHARACTER);
}
/**
* 获取所有索引,按照正则表达式方式过滤 索引名称,并返回符合条件的索引名字
*
* @param inPattern 正则表达式
* @return List
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2021/10/30-11:54
**/
public List<String> getAllIndex(String inPattern) {
GetIndexRequest getIndexRequest = new GetIndexRequest(inPattern);
try {
GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
String[] indices = getIndexResponse.getIndices();
return Arrays.asList(indices);
} catch (IOException e) {
log.error("获取索引失败 {} 已经存在", e.getMessage());
} catch (ElasticsearchStatusException e) {
log.error("获取索引失败 {} 索引本身不存在", e.getMessage());
}
return Collections.EMPTY_LIST;
}
3.1.3. 删除索引
删除的逻辑就比较简单,这里就不多说。
/**
* 删除 index,以及索引下数据
*
* @param indexName 索引名字
* @author <a href="https://github.com/rothschil">Sam</a>
* @date 2019/10/17 17:13
*/
public void deleteIndex(String indexName) {
try {
restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
3.2. 引用依赖
构建一个工程,我这里依然用 Gralde
工程作为样例说明, Maven
项目类似。
3.2.1. 依赖管理
implementation("io.github.rothschil:persistence-elasticsearch:1.2.3.RELEASE")
3.2.2. 依赖注入
在工程项目中直接使用 ElasticIndexManger
作为实例注入进来,后面我们可以直接使用它提供的各种方法。样例中我是定义一个精确查询作为说明,TermQueryBuilder("sysCode","crm")
中参数分别代表匹配条件的列名和列的值; 在索引列名中我这里用的是 通配符,即可以在多个索引之间查询; AccLog.class
这是我自定义的类,用以接收查询出来的结果进行实例化映射。
@Component
public class LogIndexManager{
private ElasticIndexManger elasticIndexManger;
@Autowired
public void setElasticIndexManger(ElasticIndexManger elasticIndexManger) {
this.elasticIndexManger = elasticIndexManger;
}
public List<AccLog> query(){
QueryBuilder queryBuilder = new TermQueryBuilder("sysCode","crm");
SearchSourceBuilder sb = new SearchSourceBuilder();
sb.query(queryBuilder);
return elasticIndexManger.search("hnqymh_hpg*",sb,AccLog.class);
}
}
4. 源码
Github演示源码 ,记得给Star
Gitee演示源码,记得给Star