Elasticsearch Java REST高级客户端-Doc

2020-10-21  本文已影响0人  觉释

版本:Elasticsearch 7.2.0

当你把Elasticsearch升级到7后会发现TransportClient这个客户端已过时,而且PreBuiltTransportClient这个已被删除,那么现在有如何使用java 客户端?Elasticsearch官方建议使用REST高级客户端RestHighLevelClient。话不多说,直接看例子:

  1. 创建客户端、及关闭
  2. 插入一条数据(如果存在则更新) index()
  3. 获取一个文档 get()
  4. 文档是否存在 exists()
  5. 删除文档 delete()
  6. 更新 update()
  7. 批量更新 bulk()
  8. 批量查询 multiGet()
  9. 异步操作
  10. maven配置

1.创建客户端、及关闭

    /**
     * 获取客户端
     * 
     * @return
     */
    private static RestHighLevelClient getClient() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("node01", 9200, "http"), 
                        new HttpHost("node02", 9200, "http"),
                        new HttpHost("node03", 9200, "http")));
        return client;
    }
 
 
/**
     * 关闭客户端
     * 
     * @param client
     */
    private static void close(RestHighLevelClient client) {
        try {
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

2.插入一条数据(如果存在则更新) index()

/**
    * 插入一条数据(如果存在则更新)
    */
   public static void index() {
       RestHighLevelClient client = getClient();

       Map<String, Object> jsonMap = new HashMap<>();
       jsonMap.put("user", "kimchy");
       jsonMap.put("postDate", new Date());
       jsonMap.put("message", "trying out Elasticsearch");
       IndexRequest request = new IndexRequest("user").id("1").source(jsonMap);

       IndexResponse response = null;
       try {
           response = client.index(request, RequestOptions.DEFAULT);
       } catch (IOException e) {
           e.printStackTrace();
       }

       String index = response.getIndex();
       String id = response.getId();
       System.out.println(index + "---" + id);

       close(client);
   }

3.获取一个文档 get()

/**
     * 获取一个文档
     */
    public static void get() {
        RestHighLevelClient client = getClient();
        GetRequest getRequest = new GetRequest("user", "1");
        try {
            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
            String index = getResponse.getIndex();
            String id = getResponse.getId();
            if (getResponse.isExists()) {
                long version = getResponse.getVersion();
                String sourceAsString = getResponse.getSourceAsString();
                Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
                byte[] sourceAsBytes = getResponse.getSourceAsBytes();
 
                System.out.println("索引:" + index + ",ID:" + id + "版本:" + version);
                System.out.println(sourceAsString);
                System.out.println(sourceAsMap);
                System.out.println(sourceAsBytes.toString());
            } else {
                System.out.println("未查到结果");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        close(client);
    }

4.文档是否存在 exists()

/**
    * 文档是否存在
    */
   public static void exists() {
       RestHighLevelClient client = getClient();
       GetRequest getRequest = new GetRequest("user", "1");
       // 禁止获取_source
       getRequest.fetchSourceContext(new FetchSourceContext(false));
       // 禁止获取存储字段
       getRequest.storedFields("_none_");
       try {
           boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
           System.out.println(getRequest.index() + "---" + getRequest.id() + "---文档是否存在:" + exists);
       } catch (IOException e) {
           // TODO 自动生成的 catch 块
           e.printStackTrace();
       }
       close(client);
   }

5.删除文档 delete()

/**
    * 删除文档
    */
   public static void delete() {
       RestHighLevelClient client = getClient();
       DeleteRequest deleteRequest = new DeleteRequest("user", "1");
       try {
           client.delete(deleteRequest, RequestOptions.DEFAULT);
           System.out.println(deleteRequest.index() + "---" + deleteRequest.id() + ": 文档已删除");
           exists();
       } catch (IOException e) {
           e.printStackTrace();
       }
       close(client);
   }

6.更新 update()

public static void update() {
       RestHighLevelClient client = getClient();
       Map<String, Object> jsonMap = new HashMap<>();
       jsonMap.put("updated", new Date());
       jsonMap.put("reason", "daily update");
       UpdateRequest request = new UpdateRequest("user", "2").doc(jsonMap);
       try {
           UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
           System.out.println(response.getIndex() + "---" + response.getId() + "完成更新");
       } catch (IOException e) {
           e.printStackTrace();
       }
       close(client);
   }

7.批量更新 bulk()

 
    /**
     * 批量更新
     */
    public static void bulk() {
        RestHighLevelClient client = getClient();
 
        BulkRequest request = new BulkRequest();
        request.add(new IndexRequest("user").id("3").source(XContentType.JSON, "field", "foo", "user", "lucky"));
        request.add(new IndexRequest("user").id("4").source(XContentType.JSON, "field", "bar", "user", "Jon"));
        request.add(new IndexRequest("user").id("5").source(XContentType.JSON, "field", "baz", "user", "Lucy"));
        // id为10的不存在
        request.add(new DeleteRequest("user", "3"));
        request.add(new UpdateRequest("user", "2").doc(XContentType.JSON, "other", "test"));
        BulkResponse bulkResponse = null;
        try {
            bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 获取执行状态
        System.out.println("批量更新结果状态:" + bulkResponse.status());
        close(client);
    }

8.批量查询 multiGet()

// 批量查询
   public static void multiGet() {
       RestHighLevelClient client = getClient();

       MultiGetRequest request = new MultiGetRequest();
       request.add(new MultiGetRequest.Item("user", "2"));
       request.add(new MultiGetRequest.Item("user", "4"));
       request.add(new MultiGetRequest.Item("user", "5"));

       MultiGetResponse mget = null;
       try {
           mget = client.mget(request, RequestOptions.DEFAULT);
       } catch (IOException e) {
           e.printStackTrace();
       }
       // 打印查詢結果
       System.out.println("mget:");
       mget.forEach(item -> System.out.println(item.getResponse().getSourceAsString()));
       close(client);
   }

9.异步操作

以上做法均为同步性操作,非异步操作,所以这些操作都是阻塞式操作,知道查到结果为止。对异步操作只列出代码,不做举例

ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
   @Override
   public void onResponse(SearchResponse searchResponse) {
       // 查询成功
   }

   @Override
   public void onFailure(Exception e) {
       // 查询失败
   }
};

client.indexAsync(request, RequestOptions.DEFAULT, listener);

10.maven

<dependency>
           <groupId>org.elasticsearch.plugin</groupId>
           <artifactId>x-pack-sql-jdbc</artifactId>
           <version>7.2.0</version>
<!--            <version>6.7.1</version> -->
       </dependency>

       <dependency>
           <groupId>org.elasticsearch.client</groupId>
           <artifactId>elasticsearch-rest-high-level-client</artifactId>
           <version>7.2.0</version>
<!--            <version>6.7.1</version> -->
       </dependency>
上一篇 下一篇

猜你喜欢

热点阅读