spring bootJAVA

SpringBoot集成Elasticsearch 实战(1)

2021-12-27  本文已影响0人  孤山之王

1. 目录

20201230114440

2. SpringBoot集成

开发工具,这里选择的是IDEA 2021.1.2,构建 Gradle 工程等一堆通用操作,不清楚的自行百度 或者 参看 90分钟玩转Gradle

2.1. 依赖配置

我这边选择 spring-boot-starter-data-elasticsearch 方式来集成 spring-boot 中集成的版本号与实际安装版本号的差异,尽量选择一致的版本,否则在集成过程中,会有莫名的问题。读者在选择的时候多加留意。


api("org.springframework.boot:spring-boot-starter-data-elasticsearch")

我在此基础上封装一层 persistence-elasticsearch,更贴近一般项目使用。

中央仓库 20211227173753

2.2. 核心操作类

为了规范索引管理,这里将所有的操作都封装成一个基类,实现对索引的增删改查。同时还集成了对数据的单个以及批量的插入以及删除。避免针对每个索引都自己写一套实现,杜绝代码的冗余,同时这样的集成对代码的结构本身也是低侵入性。


public abstract class AbstractElasticIndexManger {

    protected ElasticsearchRestTemplate elasticsearchRestTemplate;

    protected RestHighLevelClient restHighLevelClient;

    @Autowired
    public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
        this.restHighLevelClient = restHighLevelClient;
    }

    @Autowired
    public void setElasticsearchRestTemplate(ElasticsearchRestTemplate elasticsearchRestTemplate) {
        this.elasticsearchRestTemplate = elasticsearchRestTemplate;
    }

    /**
     * 设置分片 和 副本
     * 副本作用主要为了保证数据安全
     *
     * @param request 请求
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 19:27
     */
    protected void buildSetting(CreateIndexRequest request, int replicas, int shards) {
        request.settings(Settings.builder().put("index.number_of_shards", shards)
                .put("index.number_of_replicas", replicas));
    }

    /**
     * 查询匹配条件的数据量,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param builder    BoolQueryBuilder类型查询实例
     * @param indexNames 索引名,可以一次性查询多个
     * @return long 最终数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected long count(BoolQueryBuilder builder, String... indexNames) {
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
        nativeSearchQueryBuilder.withQuery(builder);
        return elasticsearchRestTemplate.count(nativeSearchQueryBuilder.build(), IndexCoordinates.of(indexNames));
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param builder    BoolQueryBuilder类型查询实例
     * @param clazz      Class对象
     * @param indexNames 索引名,可以一次性查询多个
     * @return long 最终数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected SearchHits search(BoolQueryBuilder builder, Class<? extends BasePo> clazz, String... indexNames) {
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
        nativeSearchQueryBuilder.withQuery(builder);
        Pageable pageable = PageRequest.of(1, 20);
        nativeSearchQueryBuilder.withPageable(pageable);
        return elasticsearchRestTemplate.search(nativeSearchQueryBuilder.build(), clazz, IndexCoordinates.of(indexNames));
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param page       当前页
     * @param size       每页大小
     * @param builder    BoolQueryBuilder类型查询实例
     * @param clazz      Class对象
     * @param indexNames 索引名,可以一次性查询多个
     * @return SearchHits 命中结果的数据集
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected SearchHits<? extends BasePo> searchPage(int page, int size, BoolQueryBuilder builder, Class<? extends BasePo> clazz, String... indexNames) {
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
        nativeSearchQueryBuilder.withQuery(builder);
        Pageable pageable = PageRequest.of(page, size);
        nativeSearchQueryBuilder.withPageable(pageable);
        return elasticsearchRestTemplate.search(nativeSearchQueryBuilder.build(), clazz, IndexCoordinates.of(indexNames));
    }

    protected DeleteByQueryRequest builderDeleteRequest(QueryBuilder builder, String... indexNames) {
        DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames);
        request.setQuery(builder);
        request.setBatchSize(0X5F5E0FF);
        request.setConflicts("proceed");
        return request;
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param params     Map形式的 字段名 和 字段内容 组成的条件
     * @param builder    BoolQueryBuilder类型查询实例
     * @param indexNames 索引名,可以一次性查询多个
     * @return long 最终数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected BulkByScrollResponse update(Map<String, Object> params, BoolQueryBuilder builder, String... indexNames) {
        UpdateByQueryRequest request = buildUpdateByQueryReq(params, builder, indexNames);
        try {
            return restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 构建更新 QueryRequest
     *
     * @param params     参数
     * @param builder    布尔构建
     * @param indexNames 索引
     * @return UpdateByQueryRequest
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-15:50
     **/
    protected UpdateByQueryRequest buildUpdateByQueryReq(Map<String, Object> params, BoolQueryBuilder builder, String... indexNames) {
        Script script = buildScriptType(params);
        UpdateByQueryRequest request = new UpdateByQueryRequest(indexNames);
        request.setQuery(builder);
        request.setScript(script);
        request.setConflicts("proceed");
        request.setRefresh(true);
        request.setTimeout(TimeValue.timeValueMinutes(3));
        return request;
    }

    /**
     * 以 K-V键值对 方式构建条件 Script
     *
     * @param params Map形式的 字段名 和 字段内容 组成的条件
     * @return Script
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-13:19
     **/
    protected Script buildScriptType(Map<String, Object> params) {
        Set<String> keys = params.keySet();
        StringBuffer idOrCodeStb = new StringBuffer();
        for (String key : keys) {
            idOrCodeStb.append("ctx._source.").append(key).append("=params.").append(key).append(";");
        }
        ScriptType type = ScriptType.INLINE;
        return new Script(type, Script.DEFAULT_SCRIPT_LANG, idOrCodeStb.toString(), params);
    }

    /**
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void setBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        mustBuilders(builder, bool);
        mustNotBuilders(builder, bool);
        shouldBuilders(builder, bool);
        filterBuilders(builder, bool);
    }

    /**
     * 构建满足 必须 条件 的方法
     *
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void mustBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        List<AtomicCondition> must = bool.getMust();
        if (must.isEmpty()) {
            return;
        }
        for (AtomicCondition cds : must) {
            builder.must(getQueryBuilder(cds));
        }
    }

    /**
     * 构建满足 非必须 条件 的方法
     *
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void mustNotBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        List<AtomicCondition> mustNot = bool.getMustNot();
        if (mustNot.isEmpty()) {
            return;
        }
        for (AtomicCondition cds : mustNot) {
            builder.mustNot(getQueryBuilder(cds));
        }
    }

    /**
     * 构建满足 可选 条件 的方法
     *
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void shouldBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        List<AtomicCondition> should = bool.getShould();
        if (should.isEmpty()) {
            return;
        }
        for (AtomicCondition cds : should) {
            builder.should(getQueryBuilder(cds));
        }
    }

    /**
     * 构建满足 必须 条件 的方法,推荐使用
     *
     * @param builder BoolQueryBuilder
     * @param bool    布尔类条件
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/28-14:45
     **/
    protected void filterBuilders(BoolQueryBuilder builder, BoolCondition bool) {
        List<AtomicCondition> filter = bool.getFilter();
        if (filter.isEmpty()) {
            return;
        }
        for (AtomicCondition cds : filter) {
            builder.filter(getQueryBuilder(cds));
        }
    }

    public QueryBuilder getQueryBuilder(AtomicCondition cds) {
        QueryBuilder queryBuilder;
        Tuple tuple = cds.getTuple();
        switch (cds.getStatus()) {
            case (Constants.SUFFIX_QUERY):
                queryBuilder = QueryBuilders.wildcardQuery(cds.getField(), Constants.MULTI_CHARACTER + tuple.v1());
                break;
            case (Constants.SUFFIX_SINGLE_QUERY):
                queryBuilder = QueryBuilders.wildcardQuery(cds.getField(), Constants.SINGLE_CHARACTER + tuple.v1());
                break;
            case (Constants.RANGE_QUERY):
                queryBuilder = QueryBuilders.rangeQuery(cds.getField()).from(tuple.v1()).to(tuple.v2());
                break;
            case (Constants.PREFIX_QUERY):
                queryBuilder = QueryBuilders.prefixQuery(cds.getField(), tuple.v1().toString());
                break;
            case (Constants.REG_QUERY):
                queryBuilder = QueryBuilders.regexpQuery(cds.getField(), tuple.v1().toString());
                break;
            default:
                queryBuilder = QueryBuilders.termQuery(cds.getField(), tuple.v1().toString());
                break;
        }
        return queryBuilder;
    }
}



public class ElasticIndexManger extends AbstractElasticIndexManger {


    /**
     * 创建索引,默认分片数量为 1,即一个主片,副本数量为 0
     *
     * @param indexName 索引名称
     * @param mapping   索引定义,JSON形式的字符串
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:30
     */
    public void createIndex(String indexName, String mapping) {
        createIndex(indexName, mapping, 0, 1);
    }


    /**
     * 指定索引结构创建索引
     *
     * @param indexName 索引名称
     * @param mapping   索引定义,JSON形式的字符串
     * @param replicas  副本的数量
     * @param shards    分片数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:30
     */
    public void createIndex(String indexName, String mapping, int replicas, int shards) {
        try {
            if (!this.existIndex(indexName)) {
                log.error(" indexName={} 已经存在,mapping={}", indexName, mapping);
                return;
            }
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            buildSetting(request, replicas, shards);
            request.mapping(mapping, XContentType.JSON);
            CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
            if (!res.isAcknowledged()) {
                throw new RuntimeException("初始化失败");
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(0);
        }
    }

    /**
     * 获取所有索引,默认为所有索引
     *
     * @return List
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/30-11:54
     **/
    public List getAllIndex() {
        return getAllIndex(Constants.MULTI_CHARACTER);
    }

    /**
     * 获取所有索引,按照正则表达式方式过滤 索引名称,并返回符合条件的索引名字
     *
     * @param inPattern 正则表达式
     * @return List
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/30-11:54
     **/
    public List<String> getAllIndex(String inPattern) {
        GetIndexRequest getIndexRequest = new GetIndexRequest(inPattern);
        try {
            GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
            String[] indices = getIndexResponse.getIndices();
            return Arrays.asList(indices);
        } catch (IOException e) {
            log.error("获取索引失败 {} 已经存在", e.getMessage());
        } catch (ElasticsearchStatusException e) {
            log.error("获取索引失败 {} 索引本身不存在", e.getMessage());
        }
        return Collections.EMPTY_LIST;
    }

    /**
     * 制定配置项的判断索引是否存在,注意与 isExistsIndex 区别
     * <ul>
     *     <li>1、可以指定 用本地检索 还是用 主动节点方式检索</li>
     *     <li>2、是否适应被人读的方式</li>
     *     <li>3、返回默认设置</li>
     * </ul>
     *
     * @param indexName index名
     * @return boolean
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:27
     */
    public boolean existIndex(String indexName) throws IOException {
        GetIndexRequest request = new GetIndexRequest(indexName);
        //TRUE-返回本地信息检索状态,FALSE-还是从主节点检索状态
        request.local(false);
        //是否适应被人可读的格式返回
        request.humanReadable(true);
        //是否为每个索引返回所有默认设置
        request.includeDefaults(false);
        //控制如何解决不可用的索引以及如何扩展通配符表达式,忽略不可用索引的索引选项,仅将通配符扩展为开放索引,并且不允许从通配符表达式解析任何索引
        request.indicesOptions(IndicesOptions.lenientExpandOpen());
        return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
    }

    /**
     * 单纯断某个索引是否存在
     *
     * @param indexName index名
     * @return boolean 存在为True,不存在则 False
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:27
     */
    public boolean isIndexExists(String indexName) throws Exception {
        return restHighLevelClient.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
    }

    /**
     * 批量插入数据,通过 {@link List} 的对象集合进行插入,此处对失败的提交进行二次提交,并覆盖原有数据,这一层面是 ElasticSearch自行控制
     *
     * @param indexName index
     * @param list      列表
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:26
     */
    public void batch(String indexName, List<? extends BasePo> list) throws IOException {
        int sleep = 15;
        BulkRequest request = new BulkRequest();
        list.forEach(item -> request.add(new IndexRequest(indexName)
                .id(item.getId().toString())
                .source(JSON.toJSONString(item), XContentType.JSON)));
        try {
            BulkResponse bulkResponse = bulk(request);
            log.error("[Verification BulkResponse bulk 操作结果] {}, 文件大小 {} ", bulkResponse.status(), list.size());
            if (bulkResponse.hasFailures()) {
                log.error(bulkResponse.buildFailureMessage());
                for (BulkItemResponse bulkItemResponse : bulkResponse) {
                    if (bulkItemResponse.isFailed()) {
                        log.error(bulkItemResponse.getFailureMessage());
                    }
                }
                log.error("批量操作失败,重新再提交一次,间隔时间{}, 文件大小 {} ", sleep, list.size());
                TimeUnit.SECONDS.sleep(sleep);
                bulkResponse = bulk(request);
                if (bulkResponse.hasFailures()) {
                    log.error("再次提交失败,需要写入MQ , 文件大小 {} ", list.size());
                }
            }
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * bulk 方式批量提交
     *
     * @param request {@link BulkRequest} 请求
     * @return BulkResponse
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/24-15:50
     **/
    private BulkResponse bulk(BulkRequest request) throws IOException {
        return restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
    }

    /**
     * <p>
     * 批量插入数据,通过 {@link List} 的对象集合进行插入,提交前,判断 该索引是否存在不存在则直接创建 该索引
     * 并对失败的提交进行二次提交,并覆盖原有数据,这一层面是 ElasticSearch自行控制
     * </p>
     *
     * @param indexName index
     * @param list      列表
     * @param created   当索引不存在,则创建索引,默认为 true,即索引不存在,创建该索引,此时 mapping 应该不为空
     * @param mapping   索引定义,JSON形式的字符串
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:26
     */
    public void batch(List<? extends BasePo> list, String indexName, boolean created, String mapping) throws Exception {
        try {
            if (!isIndexExists(indexName)) {
                log.error("[Index does not exist] Rebuilding index. IndexName ={}", indexName);
                if (created && StringUtils.isNotBlank(mapping)) {
                    createIndex(indexName, mapping);
                } else {
                    log.error("[Index does not exist , No index creation] IndexName ={}", indexName);
                    return;
                }
            }
            batch(indexName, list);
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 批量删除,根据索引名称,删除索引下数据
     *
     * @param indexName index
     * @param idList    待删除列表
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:14
     */
    public <T> void deleteBatch(String indexName, Collection<T> idList) {
        BulkRequest request = new BulkRequest();
        idList.forEach(item -> request.add(new DeleteRequest(indexName, item.toString())));
        try {
            restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 根据索引名称,和 {@link SearchSourceBuilder} 条件,以及返回对象实体类,返回列表
     *
     * @param indexName index
     * @param builder   查询参数
     * @param clazz     结果类对象
     * @return java.util.List<T>
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:14
     */
    public <T> List<T> search(String indexName, SearchSourceBuilder builder, Class<T> clazz) {
        List res = Collections.EMPTY_LIST;
        try {
            SearchRequest request = new SearchRequest(indexName);
            request.source(builder);
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            SearchHit[] hits = response.getHits().getHits();
            res = new ArrayList<>(hits.length);
            for (SearchHit hit : hits) {
                res.add(JSON.parseObject(hit.getSourceAsString(), clazz));
            }
        } catch (IOException e) {
            log.error("[ElasticSearch] connect err ,err-msg {}", e.getMessage());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return res;
    }

    /**
     * 删除 index,以及索引下数据
     *
     * @param indexName 索引名字
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:13
     */
    public void deleteIndex(String indexName) {
        try {
            restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 删除索引下数据,但是不删除索引结构
     *
     * @param builder    条件构建模式
     * @param indexNames 索引名称列表
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:13
     */
    public void deleteByQuery(QueryBuilder builder, String... indexNames) {
        try {
            DeleteByQueryRequest request = builderDeleteRequest(builder, indexNames);
            BulkByScrollResponse response = restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 不推荐使用,原因为不够灵活,获取该索引下可以匹配的数量,支持 模糊查询和精确查询,
     * 用法 在 方法 <b>field</b> 的处理上。
     * <ul>
     *     <li>模糊匹配模式:字段</li>
     *     <li>精确匹配模式:字段.类型</li>
     * </ul>
     *
     * @param indexName 文档索引名
     * @param field     字段
     * @param text      内容
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2018/07/20-20:47
     **/
    @Deprecated
    public long countMatchPhrasePrefixQuery(String indexName, String field, String text) {
        CountRequest countRequest = new CountRequest(indexName);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(matchPhrasePrefixQuery(field, text));
        countRequest.source(searchSourceBuilder);
        CountResponse countResponse = null;
        try {
            countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return countResponse == null ? 0L : countResponse.getCount();
    }


    /**
     * 按照字段 内容进行精确匹配,返回匹配的数量
     *
     * @param field      字段名
     * @param content    内容
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:49
     **/
    public long exactCondition(String field, String content, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.termQuery(field, content));
        return count(builder, indexNames);
    }


    /**
     * 按照字段的前缀内容进行匹配,返回匹配的数量
     *
     * @param field      字段名
     * @param prefix     前缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:49
     **/
    public long prefix(String field, String prefix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.prefixQuery(field, prefix));
        return count(builder, indexNames);
    }


    /**
     * 按照字段对 内容进行后缀匹配,返回匹配的数量
     *
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:56
     **/
    public long suffix(String field, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }


    /**
     * 字段的前缀和后缀都必须满足条件
     *
     * @param field      字段
     * @param prefix     前缀
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:59
     **/
    public long prefixAndSuffix(String field, String prefix, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.prefixQuery(field, prefix));
        builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }

    /**
     * 字段的前缀和后缀都满足一个条件按即可
     *
     * @param field      字段
     * @param prefix     前缀
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:59
     **/
    public long prefixOrSuffix(String field, String prefix, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.should(QueryBuilders.prefixQuery(field, prefix));
        builder.should(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }

    /**
     * 字段的前缀必须满足,而 后缀则不要求 不一定满足
     *
     * @param field      字段
     * @param prefix     前缀
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:59
     **/
    public long prefixMustSuffixShould(String field, String prefix, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.must(QueryBuilders.prefixQuery(field, prefix));
        builder.should(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }

    /**
     * 字段的前缀选择性满足,而 后缀则一定要满足
     *
     * @param field      字段
     * @param prefix     前缀
     * @param suffix     后缀
     * @param indexNames 索引名
     * @return long 数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-10:59
     **/
    public long prefixShouldSuffixMust(String field, String prefix, String suffix, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        builder.should(QueryBuilders.prefixQuery(field, prefix));
        builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
        return count(builder, indexNames);
    }


    /**
     * 查询总数
     *
     * @param indexNames 索引文档名称,可以是多个
     * @return long 匹配的数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/29-21:11
     **/
    public long total(String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        return count(builder, indexNames);
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param params     Map形式的 字段名 和 字段内容 组成的条件
     * @param bool      复合条件封装
     * @param indexNames 索引名,可以一次性查询多个
     * @return long 最终数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    public BulkByScrollResponse update(Map<String, Object> params, BoolCondition bool, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        setBuilders(builder,bool);
        return update(params,builder,indexNames);
    }

    /**
     * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
     *
     * @param page    当前页
     * @param size    每页大小
     * @param clazz      Class对象
     * @param indexNames 索引名,可以一次性查询多个
     * @return SearchHits 命中结果的数据集
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/11/1-9:26
     **/
    protected SearchHits<? extends BasePo> searchPage(int page, int size, BoolCondition bool,Class<? extends BasePo> clazz, String... indexNames) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        setBuilders(builder,bool);
        return searchPage(page,size,builder, clazz, indexNames);
    }

}

3. 项目代码

通过以上的集成,我们看到完成在项目中对 elasticsearch 的集成,同时也用基类,将所有可能的操作都封装起来。下来我们通过对基类的讲解,来逐个说明!

3.1. 索引管理

由于在ElasticIndexManger类定义了所有方法,直接调用即可。

3.1.1. 创建索引

我们在创建索引过程中需要先判断是否有这个索引,否则不允许创建,由于我案例采用的是手动指定 indexNameSettings ,大家看的过程中要特别注意下,而且还有一点 indexName 必须是小写,如果是大写在创建过程中会有错误

官方索引创建说明 索引名大写

。详细的代码实现见如下:


 /**
     * 创建索引,默认分片数量为 1,即一个主片,副本数量为 0
     *
     * @param indexName 索引名称
     * @param mapping   索引定义,JSON形式的字符串
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:30
     */
    public void createIndex(String indexName, String mapping) {
        createIndex(indexName, mapping, 0, 1);
    }


    /**
     * 指定索引结构创建索引
     *
     * @param indexName 索引名称
     * @param mapping   索引定义,JSON形式的字符串
     * @param replicas  副本的数量
     * @param shards    分片数量
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:30
     */
    public void createIndex(String indexName, String mapping, int replicas, int shards) {
        try {
            if (!this.existIndex(indexName)) {
                log.error(" indexName={} 已经存在,mapping={}", indexName, mapping);
                return;
            }
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            buildSetting(request, replicas, shards);
            request.mapping(mapping, XContentType.JSON);
            CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
            if (!res.isAcknowledged()) {
                throw new RuntimeException("初始化失败");
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(0);
        }
    }

创建索引需要设置分片,这里采用Settings.Builder方式,当然也可以 JSON 自定义方式,本文篇幅有限,不做演示。

index.number_of_shards:分片数

number_of_replicas:副本数

    /**
     * 设置分片 和 副本
     * 副本作用主要为了保证数据安全
     *
     * @param request 请求
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 19:27
     */
    protected void buildSetting(CreateIndexRequest request, int replicas, int shards) {
        request.settings(Settings.builder().put("index.number_of_shards", shards)
                .put("index.number_of_replicas", replicas));
    }


[elastic@localhost elastic]$ curl -H "Content-Type: application/json" -X GET "http://localhost:9200/_cat/indices?v"

health status index        uuid                   pri rep docs.count docs.deleted store.size pri.store.size

yellow open   twitter      scSSD1SfRCio4F77Hh8aqQ   3   2          2            0      8.3kb          8.3kb

yellow open   idx_location _BJ_pOv0SkS4tv-EC3xDig   3   2          1            0        4kb            4kb

yellow open   wongs        uT13XiyjSW-VOS3GCqao8w   3   2          1            0      3.4kb          3.4kb

yellow open   idx_locat    Kr3wGU7JT_OUrRJkyFSGDw   3   2          3            0     13.2kb         13.2kb

yellow open   idx_copy_to  HouC9s6LSjiwrJtDicgY3Q   3   2          1            0        4kb            4kb
  

说明创建成功,这里总是通过命令行来验证,有点繁琐,既然我都有WEB服务,为什么不直接通过HTTP验证了?

3.1.2. 查看索引

查看索引这个操作支持模糊操作,即以通配符 * 作为一个或者多个字符匹配,这个操作在实际应用非常好用,将来有机会说到 Index 设计过程中就显得尤为重要。


    /**
     * 获取所有索引,默认为所有索引
     *
     * @return List
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/30-11:54
     **/
    public List getAllIndex() {
        return getAllIndex(Constants.MULTI_CHARACTER);
    }

    /**
     * 获取所有索引,按照正则表达式方式过滤 索引名称,并返回符合条件的索引名字
     *
     * @param inPattern 正则表达式
     * @return List
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2021/10/30-11:54
     **/
    public List<String> getAllIndex(String inPattern) {
        GetIndexRequest getIndexRequest = new GetIndexRequest(inPattern);
        try {
            GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
            String[] indices = getIndexResponse.getIndices();
            return Arrays.asList(indices);
        } catch (IOException e) {
            log.error("获取索引失败 {} 已经存在", e.getMessage());
        } catch (ElasticsearchStatusException e) {
            log.error("获取索引失败 {} 索引本身不存在", e.getMessage());
        }
        return Collections.EMPTY_LIST;
    }

3.1.3. 删除索引

删除的逻辑就比较简单,这里就不多说。


    /**
     * 删除 index,以及索引下数据
     *
     * @param indexName 索引名字
     * @author <a href="https://github.com/rothschil">Sam</a>
     * @date 2019/10/17 17:13
     */
    public void deleteIndex(String indexName) {
        try {
            restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

3.2. 引用依赖

构建一个工程,我这里依然用 Gralde 工程作为样例说明, Maven 项目类似。

3.2.1. 依赖管理


implementation("io.github.rothschil:persistence-elasticsearch:1.2.3.RELEASE")

3.2.2. 依赖注入

在工程项目中直接使用 ElasticIndexManger 作为实例注入进来,后面我们可以直接使用它提供的各种方法。样例中我是定义一个精确查询作为说明,TermQueryBuilder("sysCode","crm") 中参数分别代表匹配条件的列名和列的值; 在索引列名中我这里用的是 通配符,即可以在多个索引之间查询; AccLog.class 这是我自定义的类,用以接收查询出来的结果进行实例化映射。


@Component
public class LogIndexManager{

    private ElasticIndexManger elasticIndexManger;

    @Autowired
    public void setElasticIndexManger(ElasticIndexManger elasticIndexManger) {
        this.elasticIndexManger = elasticIndexManger;
    }

    public List<AccLog> query(){
        QueryBuilder queryBuilder = new TermQueryBuilder("sysCode","crm");
        SearchSourceBuilder sb = new SearchSourceBuilder();
        sb.query(queryBuilder);
        return elasticIndexManger.search("hnqymh_hpg*",sb,AccLog.class);
    }
}

![查询结果]](https://img.haomeiwen.com/i7232803/c3509fce823f59ea.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

4. 源码

Github演示源码 ,记得给Star

Gitee演示源码,记得给Star

上一篇下一篇

猜你喜欢

热点阅读