RestHighLevelClient操作ES
2022-08-31 本文已影响0人
Nzkalhbxx
使用RestHighLevelClient的前提是能够熟练地通过kibana的requestBody的方式操作es的增删改查, 基础扎实, 使用api会更加容易
以下示例以es 6.8.3基础环境,所有api操作针对的是6版本的es,7版本后对弱化了类型type,api也会有所改变,实际使用时请根据具体的es版本做相应的调整
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.es</groupId>
<artifactId>restclienttest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>restclienttest</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.8.3</version>
</dependency>
<!--json解析工具-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!-- commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.8.3</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>com.es.restclienttest.RestclienttestApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
application.properties
# 应用名称
spring.application.name=restclienttest
elasticsearch.schema=http
elasticsearch.address=192.168.146.132:9200
elasticsearch.connectTimeout=5000
elasticsearch.socketTimeout=5000
elasticsearch.connectionRequestTimeout=5000
elasticsearch.maxConnectNum=100
elasticsearch.maxConnectPerRoute=100
配置类
package com.es.restclienttest.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class EsRestClientConfiguration {
/** 协议 */
@Value("${elasticsearch.schema:http}")
private String schema;
/** 集群地址,如果有多个用“,”隔开 */
@Value("${elasticsearch.address}")
private String address;
/** 连接超时时间 */
@Value("${elasticsearch.connectTimeout}")
private int connectTimeout;
/** Socket 连接超时时间 */
@Value("${elasticsearch.socketTimeout}")
private int socketTimeout;
/** 获取连接的超时时间 */
@Value("${elasticsearch.connectionRequestTimeout}")
private int connectionRequestTimeout;
/** 最大连接数 */
@Value("${elasticsearch.maxConnectNum}")
private int maxConnectNum;
/** 最大路由连接数 */
@Value("${elasticsearch.maxConnectPerRoute}")
private int maxConnectPerRoute;
@Bean(name = "restHighLevelClient")
public RestHighLevelClient restHighLevelClient(){
List<HttpHost> hostList = new ArrayList<>();
String[] addressArray = address.split(",");
for (String address : addressArray) {
String host = address.split(":")[0];
Integer port = Integer.parseInt(address.split(":")[1]);
hostList.add(new HttpHost(host, port, schema));
}
HttpHost[] httpPosts = hostList.toArray(new HttpHost[]{});
RestClientBuilder builder = RestClient.builder(httpPosts);
// 异步连接延时配置
builder.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(connectTimeout);
requestConfigBuilder.setSocketTimeout(socketTimeout);
requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout);
return requestConfigBuilder;
});
// 异步连接数配置
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(maxConnectNum);
httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
return httpClientBuilder;
});
return new RestHighLevelClient(builder);
}
}
测试类
package com.es.restclienttest;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.es.restclienttest.bean.Book;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.*;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 以下示例以elasticsearch 6.8为基础, 如果是高版本的es, 那么api可能会有所更改
* 如 7版本后, 废弃了一个索引可以有多个type的规则, 一个索引只能有一个type
* 灵活变通, 设计思想应该大同小异
*/
@SpringBootTest(classes = RestclienttestApplication.class)
//@RunWith(SpringRunner.class)
class RestclienttestApplicationTests {
// 注入客户端对象, 使用的是9200端口(http协议), 9300使用的是tcp协议
@Autowired
RestHighLevelClient restHighLevelClient;
@Test
void contextLoads() {
System.out.println(restHighLevelClient);
}
/**
* 查询索引是否存在
* @throws IOException
*/
@Test
void existsIndex() throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest();
// indices表示索引(名词), index在restHighLevelClient更多的表示的是建立索引, 动词
getIndexRequest.indices("dangdang");
boolean exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
System.out.println("exists: " + exists);
}
/**
* 删除索引
* @throws IOException
*/
@Test
void deleteIndex() throws IOException {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest();
deleteIndexRequest.indices("dangdang");
AcknowledgedResponse delete = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
System.out.println(delete.isAcknowledged());
}
/**
* 创建索引
* @throws IOException
*/
@Test
void createIndex() throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index("dangdang");
createIndexRequest.settings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas",1).build());
String bookMappingJsonString = "{\"properties\":{\"id\":{\"type\":\"keyword\"},\"name\":{\"type\":\"keyword\"},\"desc\":{\"type\":\"text\",\"analyzer\":\"ik_max_word\",\"search_analyzer\":\"ik_max_word\"},\"price\":{\"type\":\"double\"}}}";
createIndexRequest.mapping("books", bookMappingJsonString, XContentType.JSON);
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest);
System.out.println(createIndexResponse);
System.out.println(createIndexResponse.isAcknowledged());
}
/**
* 获取索引信息
* @throws IOException
*/
@Test
void getIndex() throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest();
getIndexRequest.indices("dangdang");
GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
System.out.println(getIndexResponse);
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getIndexResponse.getMappings();
}
/**
* 查询文档是否存在
*/
@Test
void existsDoc() throws IOException {
GetRequest getRequest = new GetRequest();
getRequest.index("dangdang");
getRequest.type("books");
getRequest.id("1");
getRequest.fetchSourceContext(new FetchSourceContext(true));
boolean exists = restHighLevelClient.exists(getRequest, RequestOptions.DEFAULT);
System.out.println(exists);
}
/**
* 新增单个文档
*/
@Test
void addDoc() throws IOException {
IndexRequest indexRequest = new IndexRequest("dangdang", "books");
Book book1 = new Book("001", "bookname1",11.11, "book1 desc 哈哈哈");
indexRequest.id(book1.getId());
String source = JSONObject.toJSONString(book1);
indexRequest.source(source, XContentType.JSON);
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
System.out.println(indexResponse);
}
/**
* 批量新增文档
*/
@Test
void bulkAddDoc() throws IOException {
BulkRequest bulkRequest = new BulkRequest("dangdang", "books");
IndexRequest indexRequest1 = new IndexRequest();
Book book1 = new Book("bulk001", "bulkbookname1",11.11, "bulk book1 desc 哈哈哈");
indexRequest1.id(book1.getId());
String source1 = JSONObject.toJSONString(book1);
indexRequest1.source(source1, XContentType.JSON);
IndexRequest indexRequest2 = new IndexRequest();
Book book2 = new Book("bulk002", "bulkbookname2",21.11, "bulk book2 desc 哈哈哈");
indexRequest2.id(book2.getId());
String source2 = JSONObject.toJSONString(book2);
indexRequest2.source(source2, XContentType.JSON);
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
}
/**
* 通过_id更新文档
*/
@Test
void updateDoc() throws IOException {
Book book = new Book();
book.setId("001");
book.setDesc("修改后的描述book1");
UpdateRequest updateRequest = new UpdateRequest("dangdang","books",book.getId());
String source = JSON.toJSONString(book);
updateRequest.doc(source, XContentType.JSON);
restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
}
/**
* 批量更新文档(根据文档_id)
*/
@Test
void bulkUpdateDoc() throws IOException {
BulkRequest bulkRequest = new BulkRequest();
UpdateRequest updateRequest1 = new UpdateRequest();
updateRequest1.index("dangdang");
updateRequest1.type("books");
updateRequest1.id("bulk001");
HashMap<String, Object> source1 = new HashMap<>();
source1.put("name", "bulkupdateName1");
updateRequest1.doc(source1, XContentType.SMILE);
UpdateRequest updateRequest2 = new UpdateRequest();
updateRequest2.index("dangdang");
updateRequest2.type("books");
updateRequest2.id("bulk002");
HashMap<String, Object> source2 = new HashMap<>();
source2.put("name", "bulkupdateName2");
updateRequest2.doc(source2, XContentType.SMILE);
bulkRequest.add(updateRequest1);
bulkRequest.add(updateRequest2);
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
}
/**
* 通过查询条件更新文档
*/
@Test
void updateDocByQuery() throws IOException {
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
updateByQueryRequest.indices("dangdang");
updateByQueryRequest.setDocTypes("books");
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
List<QueryBuilder> must = boolQueryBuilder.must();
must.add(QueryBuilders.termQuery("name","bookname1"));
must.add(QueryBuilders.rangeQuery("price").gte(10).lte(12));
List<QueryBuilder> should = boolQueryBuilder.should();
should.add(QueryBuilders.idsQuery("books").addIds("I13i0oIBuvWLJglaqiJQ"));
// 由于should跟must/filter一起是, 默认会把minimumShouldMatch设置为0, 因此需要显式指定, 才能符合业务需求
boolQueryBuilder.minimumShouldMatch(1);
updateByQueryRequest.setQuery(boolQueryBuilder);
Map<String, Object> data = new HashMap<>();
data.put("descAdd","增加---");
String source = "ctx._source.price=11;ctx._source.desc+=params.data.descAdd";
Map<String, Object> param = new HashMap<>();
param.put("data", data);
Script script = new Script(ScriptType.INLINE, "painless", source, param);
updateByQueryRequest.setScript(script);
BulkByScrollResponse bulkByScrollResponse = restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
System.out.println(bulkByScrollResponse);
restHighLevelClient.close();
}
/**
* 根据_id删除文档
*/
@Test
void deleteDoc() throws IOException {
Book book1 = new Book();
book1.setId("001");
DeleteRequest deleteRequest = new DeleteRequest("dangdang", "books", book1.getId());
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest);
System.out.println(deleteResponse);
}
/**
* 批量删除文档
*/
@Test
void bulkDeleteDoc() throws IOException {
BulkRequest bulkRequest = new BulkRequest();
DeleteRequest deleteRequest1 = new DeleteRequest();
deleteRequest1.index("dangdang");
deleteRequest1.type("books");
deleteRequest1.id("bulk001");
DeleteRequest deleteRequest2 = new DeleteRequest();
deleteRequest2.index("dangdang");
deleteRequest2.type("books");
deleteRequest2.id("bulk002");
bulkRequest.add(deleteRequest1);
bulkRequest.add(deleteRequest2);
BulkResponse bulkItemResponses = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(bulkItemResponses);
}
/**
* 根据查询条件删除文档
*/
@Test
void deleteDocByQuery() throws IOException {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest("dangdang");
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.mustNot(QueryBuilders.termQuery("name", "bookname1"));
List<QueryBuilder> should = boolQueryBuilder.should();
should.add(QueryBuilders.matchQuery("desc", "没有"));
should.add(QueryBuilders.idsQuery("books").addIds("IV3Q0oIBuvWLJglaoiJt"));
deleteByQueryRequest.setQuery(boolQueryBuilder);
BulkByScrollResponse bulkByScrollResponse = restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
System.out.println(bulkByScrollResponse);
}
/**
* get id查询
*/
@Test
void getIdQuery() throws IOException {
GetRequest getRequest = new GetRequest();
getRequest.index("dangdang");
getRequest.type("books");
getRequest.id("bulk001");
GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
System.out.println(getResponse);
}
/**
* 查询所有
*/
@Test
void matchAllQuery() throws IOException {
SearchRequest searchRequest = new SearchRequest("dangdang");
searchRequest.types("books");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());
sourceBuilder.from(0);
sourceBuilder.size(2);
sourceBuilder.sort("price", SortOrder.DESC);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
}
/**
* termQuery查询
*/
@Test
void termQuery() throws IOException {
SearchRequest searchRequest = new SearchRequest("dangdang");
searchRequest.types("books");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("desc","哈哈"));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
}
/**
* match查询
*/
@Test
void matchQuery() throws IOException {
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("dangdang");
searchRequest.types("books");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("desc", "哈哈哈");
sourceBuilder.query(matchQueryBuilder);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
}
/**
* bool查询
*/
@Test
void boolQuery() throws IOException {
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
List<QueryBuilder> must = boolQueryBuilder.must();
must.add(QueryBuilders.termQuery("id", "bulk001"));
must.add(QueryBuilders.matchQuery("desc", "哈哈哈"));
List<QueryBuilder> should = boolQueryBuilder.should();
should.add(QueryBuilders.multiMatchQuery("哈哈", "name", "desc"));
boolQueryBuilder.minimumShouldMatch(1);
sourceBuilder.query(boolQueryBuilder);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
}
/**
* filter过滤查询
*/
@Test
void filterlQuery() throws IOException {
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder query = QueryBuilders.boolQuery();
List<QueryBuilder> filter = query.filter();
filter.add(QueryBuilders.matchQuery("desc", "人"));
sourceBuilder.query(query);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse);
}
}