Elasticsearch——java api构建聚合搜索和ES

2020-11-22  本文已影响0人  小波同学

版本

不同版本的elasticsearch-rest-high-level-client和 elasticsearch之间存在兼容风险,请确保和elasticsearch版本一致,否则会出现无法预计的错误。

es配置

maven依赖

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.3.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.3.0</version>
</dependency>

配置application.properties

spring.application.name=service-search

#多个节点用逗号隔开
elasticsearch.hostlist=127.0.0.1:9200

创建配置类ElasticsearchConfig

@Configuration
public class ElasticsearchConfig {

    @Value("${elasticsearch.hostlist}")
    private String hostlist;

    @Bean(destroyMethod = "close")
    public RestHighLevelClient restHighLevelClient(){
        String[] split = hostlist.split(",");
        HttpHost[] httpHost = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {
            String[] item = split[i].split(":");
            httpHost[i] = new HttpHost(item[0],Integer.parseInt(item[1]),"http");
        }
        return new RestHighLevelClient(RestClient.builder(httpHost));
    }
}

搜索聚合测试代码

1、按颜色分组,计算每个颜色的销售数量

GET /tvs/_search
{
  "size": 0,
  "query": {
    "match_all": {}
  },
  "aggs": {
    "group_by_color": {
      "terms": {
        "field": "color"
      }
    }
  }
} 
@SpringBootTest(classes = SearchApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
public class TestAggs {

    @Autowired
    private RestHighLevelClient client;

    //按颜色分组,计算每个颜色的销售数量
    @Test
    public void testAggs() throws IOException {
        //1、构建请求
        SearchRequest searchRequest = new SearchRequest("tvs");
        //请求体
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.size(0);
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_color").field("color");
        searchSourceBuilder.aggregation(termsAggregationBuilder);
        //请求体放入请求头
        searchRequest.source(searchSourceBuilder);
        //2、执行
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

        //3、获取结果
        Aggregations aggregations = searchResponse.getAggregations();
        Terms terms = aggregations.get("group_by_color");
        List<? extends Terms.Bucket> buckets = terms.getBuckets();
        for (Terms.Bucket bucket : buckets) {
            String key = bucket.getKeyAsString();
            System.out.println("key: " + key);
            long docCount = bucket.getDocCount();
            System.out.println("docCount: " + docCount);
            System.out.println("-------------------------------");
        }
    }
}

2、按颜色分组,计算每个颜色的销售数量、且每个颜色卖出的平均价格

GET /tvs/_search
{
  "size": 0,
  "query": {
    "match_all": {}
  },
  "aggs": {
    "group_by_color": {
      "terms": {
        "field": "color"
      },
      "aggs": {
        "avg_price": {
          "avg": {
            "field": "price"
          }
        }
      }
    }
  }
}
@Test
public void testAggsAndAvg() throws IOException {
    //1、构建请求
    SearchRequest searchRequest = new SearchRequest("tvs");
    //请求体
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.size(0);
    searchSourceBuilder.query(QueryBuilders.matchAllQuery());
    TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_color").field("color");

    //在terms聚合下填充一个子聚合
    AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price").field("price");
    termsAggregationBuilder.subAggregation(avgAggregationBuilder);
    searchSourceBuilder.aggregation(termsAggregationBuilder);
    //请求体放入请求头
    searchRequest.source(searchSourceBuilder);
    //2、执行
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    //3、获取结果
    Aggregations aggregations = searchResponse.getAggregations();
    Terms terms = aggregations.get("group_by_color");
    List<? extends Terms.Bucket> buckets = terms.getBuckets();
    for (Terms.Bucket bucket : buckets) {
        String key = bucket.getKeyAsString();
        System.out.println("key: " + key);
        long docCount = bucket.getDocCount();
        System.out.println("docCount: " + docCount);
        Aggregations aggregations1 = bucket.getAggregations();
        Avg avg_price = aggregations1.get("avg_price");
        System.out.println("avg_price: "+avg_price.getValue());
        System.out.println("-------------------------------");
    }
}

3、按颜色分组,计算每个颜色的销售数量、以及每个颜色卖出的平均值,最大值,最小值,总和。

GET /tvs/_search
{
  "size": 0,
  "aggs": {
    "group_by_color": {
      "terms": {
        "field": "color"
      },
      "aggs": {
        "avg_price": {
          "avg": {
            "field": "price"
          }
        },
        "min_price":{
          "min": {
            "field": "price"
          }
        },
        "max_price":{
          "max": {
            "field": "price"
          }
        },
        "sum_price":{
          "sum": {
            "field": "price"
          }
        }
      }
    }
  }
}
@Test
public void testAggsAndMore() throws IOException {
    //1、构建请求
    SearchRequest searchRequest = new SearchRequest("tvs");
    //请求体
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.size(0);
    searchSourceBuilder.query(QueryBuilders.matchAllQuery());
    TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_color").field("color");

    //在termsAggregationBuilder聚合下填充多个子聚合
    AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price").field("price");
    MinAggregationBuilder minAggregationBuilder = AggregationBuilders.min("min_price").field("price");
    MaxAggregationBuilder maxAggregationBuilder = AggregationBuilders.max("max_price").field("price");
    SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum("sum_price").field("price");
    termsAggregationBuilder.subAggregation(avgAggregationBuilder);
    termsAggregationBuilder.subAggregation(minAggregationBuilder);
    termsAggregationBuilder.subAggregation(maxAggregationBuilder);
    termsAggregationBuilder.subAggregation(sumAggregationBuilder);
    searchSourceBuilder.aggregation(termsAggregationBuilder);
    //请求体放入请求头
    searchRequest.source(searchSourceBuilder);
    //2、执行
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    //3、获取结果
    Aggregations aggregations = searchResponse.getAggregations();
    Terms terms = aggregations.get("group_by_color");
    List<? extends Terms.Bucket> buckets = terms.getBuckets();
    for (Terms.Bucket bucket : buckets) {
        String key = bucket.getKeyAsString();
        System.out.println("key: " + key);
        long docCount = bucket.getDocCount();
        System.out.println("docCount: " + docCount);
        Aggregations aggregations1 = bucket.getAggregations();
        Avg avg_price = aggregations1.get("avg_price");
        System.out.println("avg_price: "+avg_price.getValue());
        Max max_price = aggregations1.get("max_price");
        System.out.println("max_price: "+max_price.getValue());
        Min min_price = aggregations1.get("min_price");
        System.out.println("min_price: "+min_price.getValue());
        Sum sum_price = aggregations1.get("sum_price");
        System.out.println("sum_price: "+sum_price.getValue());
        System.out.println("-------------------------------");
    }
}

4、按售价每2000价格划分范围,算出每个区间的销售总额 histogram

GET /tvs/_search
{
  "size": 0,
  "aggs": {
    "by_histogram": {
      "histogram": {
        "field": "price",
        "interval": 2000
      },
      "aggs": {
        "sum_price": {
          "sum": {
            "field": "price"
          }
        }
      }
    }
  }
} 
@Test
public void testAggsAndHistogram() throws IOException {
    //1、构建请求
    SearchRequest searchRequest = new SearchRequest("tvs");
    //请求体
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.size(0);
    searchSourceBuilder.query(QueryBuilders.matchAllQuery());
    HistogramAggregationBuilder histogramAggregationBuilder = AggregationBuilders.histogram("by_histogram").field("price").interval(2000);
    SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum("sum_price").field("price");
    histogramAggregationBuilder.subAggregation(sumAggregationBuilder);
    searchSourceBuilder.aggregation(histogramAggregationBuilder);
    //请求体放入请求头
    searchRequest.source(searchSourceBuilder);
    //2、执行
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    //3、获取结果
    Aggregations aggregations = searchResponse.getAggregations();
    Histogram histogram = aggregations.get("by_histogram");
    List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
    for (Histogram.Bucket bucket : buckets) {
        String key = bucket.getKeyAsString();
        System.out.println("key: " + key);
        long docCount = bucket.getDocCount();
        System.out.println("docCount: " + docCount);
        Aggregations aggregations1 = bucket.getAggregations();
        Sum sum_price = aggregations1.get("sum_price");
        System.out.println("sum_price: "+sum_price.getValue());
        System.out.println("-------------------------------");
    }
}

5、计算每个季度的销售总额

GET /tvs/_search
{
  "size": 0,
  "aggs": {
    "date_sales": {
      "date_histogram": {
        "field": "sold_date",
        "calendar_interval": "quarter",
        "format": "yyyy-MM-dd",
        "min_doc_count": 0,
        "extended_bounds": {
          "min": "2019-01-01",
          "max": "2020-12-31"
        }
      },
      "aggs": {
        "sum_price": {
          "sum": {
            "field": "price"
          }
        }
      }
    }
  }
}
@Test
public void testAggsAndDateHistogram() throws IOException {
    //1、构建请求
    SearchRequest searchRequest = new SearchRequest("tvs");
    //请求体
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.size(0);
    searchSourceBuilder.query(QueryBuilders.matchAllQuery());
    DateHistogramAggregationBuilder dateHistogramAggregationBuilder = AggregationBuilders.dateHistogram("date_sales").field("sold_date")
            .calendarInterval(DateHistogramInterval.QUARTER).format("yyyy-MM-dd")
            .minDocCount(0).extendedBounds(new ExtendedBounds("2019-01-01", "2020-12-31"));

    SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum("sum_price").field("price");
    dateHistogramAggregationBuilder.subAggregation(sumAggregationBuilder);
    searchSourceBuilder.aggregation(dateHistogramAggregationBuilder);
    //请求体放入请求头
    searchRequest.source(searchSourceBuilder);
    //2、执行
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    //3、获取结果
    Aggregations aggregations = searchResponse.getAggregations();
    ParsedDateHistogram dateHistogram = aggregations.get("date_sales");
    List<? extends Histogram.Bucket> buckets = dateHistogram.getBuckets();
    for (Histogram.Bucket bucket : buckets) {
        String key = bucket.getKeyAsString();
        System.out.println("key: " + key);
        long docCount = bucket.getDocCount();
        System.out.println("docCount: " + docCount);
        Aggregations aggregations1 = bucket.getAggregations();
        Sum sum_price = aggregations1.get("sum_price");
        System.out.println("sum_price: "+sum_price.getValue());
        System.out.println("-------------------------------");
    }
}

Elasticsearch 7 sql新功能

在 ES 里面执行 SQL 语句,有三种方式,第一种是 RESTful 方式,第二种是 SQL-CLI 命令行工具,第三种是通过 JDBC 来连接 ES,执行的 SQL 语句其实都一样,我们先以 RESTful 方式来说明用法。

RESTful下调用SQL

快速入门

POST /_sql?format=txt
{
  "query":"select * from tvs"
}
POST /_sql?format=txt
{
  "query":"select color,avg(price),min(price),max(price),sum(price) from tvs group by color"
}

SQL-CLI 命令行工具

elasticsearch-sql-cli
进入elasticsearch的安装目录


然后再sql命令行可以执行sql语句


es sql与其他DSL结合

POST /_sql?format=txt
{
  "query":"select * from tvs",
  "filter":{
    "range":{
      "price":{
        "gte" : 1200,
        "lte" : 2000
      }
    }
  }
}

java代码实现sql功能

1、在kibana中开启白金版试用
2、导入相关依赖
<dependency>
    <groupId>org.elasticsearch.plugin</groupId>
    <artifactId>x-pack-sql-jdbc</artifactId>
    <version>7.3.0</version>
</dependency>

<repositories><!-- 如果jar包下载不下来,就需要在pom文件中配置一下仓库 -->
    <repository>
        <id>elastic.co</id>
        <url>https://artifacts.elastic.co/maven</url>
    </repository>
</repositories>  
3、相关测试代码
public class TestESJdbc {

    public static void main(String[] args) throws SQLException {
        //创建连接
        Connection connection = DriverManager.getConnection("jdbc:es://http://127.0.0.1:9200");
        //创建statement
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery("select * from tvs");
        //获取结果
        while (resultSet.next()){
            System.out.println(resultSet.getString(1));
            System.out.println(resultSet.getString(2));
            System.out.println(resultSet.getString(3));
            System.out.println(resultSet.getString(4));
            System.out.println("--------");
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读