ES High Level Java API

2019-12-15  本文已影响0人  喵星人ZC

官网High Level API

分词

正排索引: doc_id到doc_content的关系

doc_id doc_content
1 大数据培训
2 Spark是一种分布式计算引擎
3 大数据培训有很多

倒索引: doc_id到doc_content的关系

doc_content doc_id
大数据培训 1,3
Spark 2
一种 2
分布式 2
计算引擎 2
很多 3

核心概念

NRT
 Cluster:1..n Node
Node
Index:     Database
Document:  Row
Type:      Table
Field:     Column

1、获取Client
Pom添加依赖

  <!-- ES Client -->
    <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-high-level-client</artifactId>
      <version>6.6.2</version>
    </dependency>

单元测试获取Client

package com.bigdata.es;

import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import sun.font.Script;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.singletonMap;

/**
 * @author soulChun
 * @create 2020-01-10-20:10
 */
public class ESAppTest {
    RestHighLevelClient client;

    @Before
    public void setUp() {
        client = new RestHighLevelClient(RestClient.builder(
                new HttpHost("hadoop000", 9200)
        ));
    }

    @Test
    public void test01() {
        System.out.println(client);
    }

    @Test
    public void createIndexWithJson() throws IOException {
        IndexRequest request = new IndexRequest(
                "posts",
                "doc",
                "1");
        String jsonString = "{" +
                "\"user\":\"kimchy\"," +
                "\"postDate\":\"2013-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        request.source(jsonString, XContentType.JSON);

        //request --> response
        IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
        System.out.println("index: " + indexResponse.getIndex());
        System.out.println("id: " + indexResponse.getId());

    }

    @Test
    public void createIndexWithMap() throws Exception {
        Map<String, Object> jsonMap = new HashMap<>();
        jsonMap.put("user", "kimchy");
        jsonMap.put("postDate", new Date());
        jsonMap.put("message", "trying out Elasticsearch");
        IndexRequest indexRequest = new IndexRequest("posts", "doc", "100")
                .source(jsonMap);

        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
        System.out.println("index: " + indexResponse.getIndex());
        System.out.println("id: " + indexResponse.getId());

    }


    @Test
    public void createIndexWithBuilder() throws Exception {
        XContentBuilder builder = XContentFactory.jsonBuilder();
//        builder.startObject();
//        {
//            builder.field("user", "kimchy");
//            builder.timeField("postDate", new Date());
//            builder.field("message", "trying out Elasticsearch");
//        }
//        builder.endObject();

        builder.startObject().field("user", "kimchy")
                .field("postDate", new Date())
                .field("message", "trying out Elasticsearch")
                .endObject();

        IndexRequest indexRequest = new IndexRequest("posts", "doc", "200")
                .source(builder);

        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
        System.out.println("index: " + indexResponse.getIndex());
        System.out.println("id: " + indexResponse.getId());

    }

    @Test
    public void createIndexWithSource() throws Exception {
        IndexRequest indexRequest = new IndexRequest("posts", "doc", "300")
                .source("user", "kimchy",
                        "postDate", new Date(),
                        "message", "trying out Elasticsearch");

        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
        System.out.println("index: " + indexResponse.getIndex());
        System.out.println("id: " + indexResponse.getId());
    }


    @Test //从索引库posts中的doc(表)取id为100的数据
    public void getIndex() throws Exception {
        GetRequest request = new GetRequest("posts", "doc", "100");
        GetResponse response = client.get(request, RequestOptions.DEFAULT);

        final String data = response.getSourceAsString();
        System.out.println(data);
    }


    @Test
    public void existIndex() throws Exception {
//        GetRequest request = new GetRequest("es","student","2");
        GetRequest request = new GetRequest("g6", "student", "1");
        request.fetchSourceContext(new FetchSourceContext(false));
        request.storedFields("_none_");

        boolean exist = client.exists(request, RequestOptions.DEFAULT);
        System.out.println(exist);

    }


    @Test
    public void updateIndex() throws Exception {
        UpdateRequest request = new UpdateRequest(
                "posts",
                "doc",
                "1");

        String jsonString = "{" +
                "\"updated\":\"2017-01-01\"," +
                "\"reason\":\"daily update\"" +
                "}";

        request.doc(jsonString, XContentType.JSON);

        UpdateResponse updateResponse = client.update(
                request, RequestOptions.DEFAULT);

        System.out.println(updateResponse.getVersion()); //4
        System.out.println(updateResponse.getResult()); //UPDATED

    }


    @Test
    public void delIndex() throws Exception {
        DeleteRequest request = new DeleteRequest(
                "es",
                "student",
                "2");

        DeleteResponse deleteResponse = client.delete(
                request, RequestOptions.DEFAULT);

        System.out.println(deleteResponse.getVersion()); //5
        System.out.println(deleteResponse.getResult()); //DELETED
    }


    @Test
    public void bulkIndex() throws Exception {

        BulkRequest request = new BulkRequest();
        request.add(new IndexRequest("posts", "doc", "1")
                .source(XContentType.JSON, "field", "foo"));
        request.add(new IndexRequest("posts", "doc", "2")
                .source(XContentType.JSON, "field", "bar"));
        request.add(new IndexRequest("posts", "doc", "3")
                .source(XContentType.JSON, "field", "baz"));


        BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

        if (bulkResponse.hasFailures()) {
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                final BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                System.out.println(failure.getMessage());
            }
        }


    }


    @Test
    public void MulitGetIndex() throws Exception {

        MultiGetRequest request = new MultiGetRequest();
//        request.add(new MultiGetRequest.Item("posts", "doc", "200"));  //exist
//        request.add(new MultiGetRequest.Item("posts", "doc", "400")); //not exist
        request.add(new MultiGetRequest.Item("es", "student", "" +
                ""));

        MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);

        final MultiGetItemResponse[] responses = response.getResponses();
        for (MultiGetItemResponse res : responses) {
            final GetResponse res2 = res.getResponse();
            if (res2.isExists()) {
                System.out.println(res2.getSourceAsString());
            }
        }
        //{"user":"kimchy","postDate":"2020-01-10T12:52:18.966Z","message":"trying out Elasticsearch"}
    }


    @Test
    public void search() throws Exception {

        SearchRequest searchRequest = new SearchRequest();
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());//.from(1).size(5);
    /*            .sort("age", SortOrder.ASC) //按照某一字段排序
                //按照年龄过滤 只取年龄在 30到31之间的
        .postFilter(QueryBuilders.rangeQuery("age").from(30).to(31));
        searchRequest.source(searchSourceBuilder);*/


        //查询字段name等于tom的数据  精准查询   tom2不会查询出来
//        searchSourceBuilder.query(QueryBuilders.matchQuery("name", "tom"));
        //也是精准查询
//        searchSourceBuilder.query(QueryBuilders.termQuery("name", "tom"));


        //查询出名字带tom的所有数据
//        searchSourceBuilder.query(QueryBuilders.queryStringQuery("name:tom*"));


        //name和msg中带tom的数据  精准
//        searchSourceBuilder.query(QueryBuilders.multiMatchQuery("tom","name","msg"));


        //指定索引库查询
        searchRequest.indices("posts");
        //指定type(表)
        searchRequest.types("doc");


        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

        final SearchHits hits = searchResponse.getHits();

        System.out.println(hits.totalHits);

        final SearchHit[] searchHits = hits.getHits();
        for (SearchHit searchHit : searchHits) {
            System.out.println(searchHit.getSourceAsString());
        }

    }


    @Test
    public void testAgg() throws Exception {
        SearchRequest searchRequest = new SearchRequest();
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        //select name,count(1) from table group by name
        //name为字段名   keyword是为了解决类型报错
        TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("by_name").field("name.keyword");
        searchSourceBuilder.query(QueryBuilders.matchAllQuery())
                .aggregation(aggregationBuilder);
        
        searchRequest.indices("g6");
        //指定type(表)
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);

        Terms terms = response.getAggregations().get("by_name");
        for (Terms.Bucket bucket : terms.getBuckets()) {
            System.out.println(bucket.getKey() + " : " + bucket.getDocCount());
        }


    }


    @Test
    public void testAgg2() throws Exception {
        SearchRequest searchRequest = new SearchRequest();
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        //按照名字聚合  求总分数
        TermsAggregationBuilder nameAgg = AggregationBuilders.terms("by_name").field("name.keyword");

        SumAggregationBuilder scoreAgg = AggregationBuilders.sum("by_score").field("score");

        nameAgg.subAggregation(scoreAgg);

        searchSourceBuilder.query(QueryBuilders.matchAllQuery())
                .aggregation(nameAgg);


        searchRequest.indices("posts");
        //指定type(表)
        searchRequest.types("doc");
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);

        Terms terms = response.getAggregations().get("by_name");
        for (Terms.Bucket bucket : terms.getBuckets()) {

            Sum sum = bucket.getAggregations().get("by_score");

            System.out.println(bucket.getKey() + " : " + sum.getValue());
        }

    }


    @After
    public void tearDown() throws IOException {
        if (client != null) {
            client.close();
        }
    }
}

上一篇 下一篇

猜你喜欢

热点阅读