日常总结:es批量入库

2024-06-04  本文已影响0人  灿烂的GL

依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>

接口

public interface BaseElasticsearchRepository<T, ID extends Serializable> {

    /**
     * 异步批量提交
     * @param entities 操作实体对象
     * @param indexName 索引名称
     */
    void bulkAsync(String indexName, List<T> entities);

    /**
     * 同步批量提交
     * @param entities 操作实体对象
     * @param indexName 索引名称
     */
    void bulkSync(String indexName, List<T> entities);
}

实现

@Slf4j
public class BaseElasticsearchRepositoryImpl<T, ID extends Serializable>  implements BaseElasticsearchRepository<T, ID> {

    @Resource
    private RestHighLevelClient restHighLevelClient;

    private ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
        @Override
        public void onResponse(BulkResponse bulkItemResponses) {
            log.info("Bulk indexing is success, cost time ms is : [{}],result={}", bulkItemResponses.getTook().duration(),bulkItemResponses.buildFailureMessage());
        }

        @Override
        public void onFailure(Exception e) {
            log.error("Bulk indexing is failure", e);
            e.printStackTrace();
        }
    };

    @Override
    public void bulkAsync(String indexName, List<T> entities) {
        BulkRequest bulkRequest = new BulkRequest();
        for (T entity : entities) {
            bulkRequest.add(new IndexRequest(indexName)
                    .source(JSON.toJSONString(entity), XContentType.JSON));
        }
        restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
    }

    @Override
    public void bulkSync(String indexName, List<T> entities) {
        BulkRequest bulkRequest = new BulkRequest();
        for (T entity : entities) {
            bulkRequest.add(new IndexRequest(indexName)
                    .source(JSON.toJSONString(entity), XContentType.JSON));
        }
        try {
            restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("Bulk Sync indexing is failure", e);
        }
    }
}

使用

@Repository
public interface TestInfoRepository extends ElasticsearchRepository<TestInfoDO, String>, BaseElasticsearchRepository<TestInfoDO, String> {
}
上一篇 下一篇

猜你喜欢

热点阅读