ElasticSearch中的一些聚合操作
2020-05-18 本文已影响0人
香山上的麻雀
今天有一个小工作,原来数据是在Hive中存着,每次查询的时候就是后端拼接SQL然后用JDBC去数据库中查询,速度很慢很慢,于是我决定把数据源改为ElasticSearch中,下面就是整个流程:
注:ElasticSearch使用的是6.3.1版本
原拼接的SQL:
select sum(cast(bdp_user_rfm.order_num as int)) as order_num,
bdp_user_rfm.rfm_label as rfm,
count(bdp_user_rfm.user_id) as user_count,
sum(cast(bdp_user_rfm.monetary as double)) as money_sum
from bdp_user_rfm inner join user_basic_info
on user_basic_info.user_id = bdp_user_rfm.user_id
where 1=1 and bdp_user_rfm.etl_date = (select etl_date from bdp_user_rfm group by etl_date order by etl_date desc limit 1)
and bdp_user_rfm.time = 'all'
group by bdp_user_rfm.rfm_label;
上面用到了2张表,而且这两张表join条件是user_id,这两张表都有一个特点就是同一个user_id只在表中有一条记录(就是在关系型数据库中user_id完全可以作为主键),所以我用Hive On Es
把两张表的数据跑到同一个Index中,用user_id作为Index的id,这样在往Es中写数据的时候已经实现了join关联。
上述SQL可以转换为如下:
ElasticSearch命令行格式:
POST /xxxxxxxxxxxxxx
{
"size": 1,
"aggs": {
"group_by_rfm_label": {
"terms": {
"field": "rfm_label.keyword",
"size": 10,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
},
"aggregations": {
"order_num": {
"sum": {
"field": "order_num",
"format": "#"
}
},
"monetary": {
"sum": {
"field": "monetary"
}
}
}
}
}
}
在接口中用Java-High-Level-Client实现:
public static JSONArray getRFMInfo(String time, String clusterId, String key, String code) {
JSONArray jsonArray = new JSONArray();
RestHighLevelClient client = null;
try {
client = getClient();
SearchRequest searchRequest = new SearchRequest("bdp_dev_profile_user_basic_label");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 时间限制 必须的字段
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("time", time));
// clusterId 限制
if (StringUtils.isNotBlank(clusterId)) {
boolQueryBuilder = boolQueryBuilder.must(QueryBuilders.termQuery("clusterid.keyword", clusterId));
}
// 用户组织机构过滤 使用模糊查询
if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(code)) {
boolQueryBuilder = boolQueryBuilder.must(QueryBuilders.matchQuery(key, code));
}
MaxAggregationBuilder maxEtlDate = AggregationBuilders
.max("max_etl_date").field("etl_date").format("yyyy-MM-dd");
String maxDate = "";
try {
ParsedMax max_etl_date = (ParsedMax) client.search(searchRequest.source(
new SearchSourceBuilder().aggregation(maxEtlDate)))
.getAggregations().asMap().get("max_etl_date");
maxDate = max_etl_date.getValueAsString();
System.out.println("maxDate = " + maxDate);
} catch (Exception e) {
e.printStackTrace();
}
if (StringUtils.isNotBlank(maxDate)) {
boolQueryBuilder = boolQueryBuilder.must(QueryBuilders.termQuery("etl_date", maxDate));
}
TermsAggregationBuilder group_by = AggregationBuilders.terms("group_by_rfm_label").field("rfm_label.keyword");
SumAggregationBuilder sum_order_num = AggregationBuilders.sum("order_num").field("order_num");
//取整数
sum_order_num.format("#");
SumAggregationBuilder sum_monetary = AggregationBuilders.sum("monetary").field("monetary");
//小数点后保留N位
sum_monetary.format("0.0000000");
group_by.subAggregation(sum_order_num);
group_by.subAggregation(sum_monetary);
sourceBuilder.query(boolQueryBuilder).aggregation(group_by);
searchRequest.source(sourceBuilder);
SearchResponse response = client.search(searchRequest);
Map<String, Aggregation> aggregationMap = response.getAggregations().asMap();
ParsedStringTerms user_count = (ParsedStringTerms) aggregationMap.get("group_by_rfm_label");
for (Terms.Bucket bucket : user_count.getBuckets()) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("rfm", bucket.getKey());
jsonObject.put("user_count", bucket.getDocCount());
/*一般保持客户-->96844 206150932.24 order_num ===>96887*/
// System.out.println(bucket.getKey() + "-->" + bucket.getDocCount());
Aggregations aggregations = bucket.getAggregations();
Map<String, Aggregation> map = aggregations.asMap();
jsonObject.put("money_sum", ((ParsedSum) map.get("monetary")).getValue());
// System.out.println(((ParsedSum) map.get("monetary")).getValueAsString());
jsonObject.put("order_num", ((ParsedSum) map.get("order_num")).getValue());
// System.out.println("order_num ===>" + ((ParsedSum) map.get("order_num")).getValueAsString());
jsonArray.add(jsonObject);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
returnClient(client);
}
return jsonArray;
}
关于数据格式的转换可以看我的另外一篇文章:
https://blog.csdn.net/qq_26502245/article/details/106197146