《离线Metrics》模块设计
2019-08-20 本文已影响0人
王勇1024
背景
离线Metrics是Alpha平台中很重要的一个功能,通过根据筛选条件选择一些样本数据,实时离线预测训练程序对样本数据的评分,从而可以让算法同学能够及时知晓训练效果。
之前由于项目比较紧急,且样本数据量较少,临时采用Redis作为样本数据的存储容器。每次筛选样本时,从Redis中读取全量数据,再遍历所有的样本数据,从中筛选出符合条件的样本。但随着样本数据的快速增多,这种方式的性能表现出现明显瓶颈,筛选样本耗时上升到了10分钟以上。
性能低下的根本原因就在于使用了不合适的存储容器,为了解决样本筛选的性能问题,首要操作就是找到合适的存储容器。通过调研,我们最终选择ES作为存储容器,原因如下:
- ES性能优越
- ELK技术栈使用简单方便
1.环境说明
Kibana
http://xxxx:5601/app/kibana#/management/kibana/index?_g=()
http://yyyy:5601/app/kibana#/management/kibana/index?_g=()
ElasticSearch
安装目录
/opt/elk/es/elasticsearch-6.3.0
Logstash
安装目录
/opt/elk/logstash-6.3.0
2.Kafka导入ES
创建配置文件
cd /opt/elk/logstash-6.3.0
vim lightmetrics.conf
input{
kafka {
bootstrap_servers => ["10.120.14.1:9092,10.120.14.2:9092,10.120.14.3:9092,10.120.14.4:9092"]
group_id => "light_metrics_indata_bi_light_cjv_pair_online_online"
topics => ["indata_str_light_metrics"]
consumer_threads => 5
decorate_events => true
codec => json
client_id => "logstash_light_metrics"
tags => ["light_metrics"]
}
kafka {
bootstrap_servers => ["10.120.14.1:9092,10.120.14.2:9092,10.120.14.3:9092,10.120.14.4:9092"]
group_id => "random_bucket_indata_bi_light_cjv_pair_online_online"
topics => ["indata_str_light_random_bucket"]
consumer_threads => 5
decorate_events => true
codec => json
client_id => "logstash_random_bucket"
tags => ["random_bucket"]
}
kafka {
bootstrap_servers => ["10.120.14.1:9092,10.120.14.2:9092,10.120.14.3:9092,10.120.14.4:9092"]
group_id => "bench_mark_indata_bi_light_cjv_pair_online_online"
topics => ["indata_str_light_bench_mark"]
consumer_threads => 5
decorate_events => true
codec => json
client_id => "logstash_benchmark"
tags => ["bench_mark"]
}
}
output {
if "light_metrics" in [tags] {
elasticsearch {
hosts => ["10.120.195.25:9200","10.120.195.26:9200","10.120.195.27:9200","10.120.195.28:9200","10.120.195.29:9200"]
index => "light_metrics"
}
}
else if "random_bucket" in [tags] {
elasticsearch {
hosts => ["10.120.195.25:9200","10.120.195.26:9200","10.120.195.27:9200","10.120.195.28:9200","10.120.195.29:9200"]
index => "random_bucket"
}
}
else if "bench_mark" in [tags] {
elasticsearch {
hosts => ["10.120.195.25:9200","10.120.195.26:9200","10.120.195.27:9200","10.120.195.28:9200","10.120.195.29:9200"]
index => "bench_mark"
}
}
else if [type] == "news" {
stdout { codec => rubydebug }
}
}
启动logstash
nohup ./bin/logstash -f lightmetrics.conf --path.data=/opt/elk/lightmetrics --path.logs=/opt/elk/lightmetrics/logs/ >/dev/null 2>&1 &
日志目录
/opt/elk/lightmetrics/logs/logstash-plain.log
3.ES查询实现
Maven依赖
<!-- ElasticSearch -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.4.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.4.2</version>
</dependency>
代码实现
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import [java.net](http://java.net/).InetAddress;
import java.util.*;
static {
try {
// 连接ES集群
Settings settings = Settings.builder().put("[cluster.name](http://cluster.name/)", "LogCollections").build();
client = new PreBuiltTransportClient(settings)
.addTransportAddresses(new TransportAddress(InetAddress.getByName(Constants.ES_HOST_25), 9300),
new TransportAddress(InetAddress.getByName(Constants.ES_HOST_26), 9300),
new TransportAddress(InetAddress.getByName(Constants.ES_HOST_27), 9300),
new TransportAddress(InetAddress.getByName(Constants.ES_HOST_28), 9300),
new TransportAddress(InetAddress.getByName(Constants.ES_HOST_29), 9300));
}catch (Exception e){
e.printStackTrace();
}
}
public static Map<String, Map<String, UserAction>> query(DatasetQuery query) throws Exception {
[logger.info](http://logger.info/)(FastJsonUtils.toJSONString(query));
final String appId = query.getAppId();
final Integer batch = query.getBatch();
final String ctype = query.getCtype();
final Date end = query.getEnd();
final int maxSample = query.getMaxSample();
final int minSample = query.getMinSample();
final Date start = query.getStart();
final int userNum = query.getUserNum();
final BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
final List<QueryBuilder> filter = boolQueryBuilder.filter();
// final List<QueryBuilder> must = boolQueryBuilder.must();
// filter.add(must);
//构造查询对象
if(StringUtils.isNotBlank(appId)) {
final QueryBuilder appidQuery = QueryBuilders.queryStringQuery(appId).field(Constants.APPID);
filter.add(appidQuery);
}
if(batch != null) {
final QueryBuilder batchQuery = QueryBuilders.termQuery(Constants.BATCH, batch);
filter.add(batchQuery);
}
if(StringUtils.isNotBlank(ctype)){
final QueryBuilder ctypeQuery= QueryBuilders.queryStringQuery(ctype).field(Constants.TYPE);
filter.add(ctypeQuery);
}
// 根据时间范围查找
if(start != null && end != null){
final QueryBuilder timestampQuery = QueryBuilders.rangeQuery(Constants.TIMESTAMP).
from(start.getTime()).to(end.getTime());
filter.add(timestampQuery);
}
// 用户-文档映射关系查询
final TermsAggregationBuilder useridAgg = AggregationBuilders.terms(Constants.USER_COUNT).field(Constants.USERID)
.size(Integer.MAX_VALUE);
// 筛选字段
final TopHitsAggregationBuilder topHitsAgg = AggregationBuilders.topHits("resource_value");
topHitsAgg.storedFields();
topHitsAgg.size(maxSample);
useridAgg.subAggregation(topHitsAgg);
// 设置最少样本数量
useridAgg.minDocCount(minSample);
// 设置bucket的大小
useridAgg.size(userNum);
//搜索结果存入SearchResponse
long st = System.currentTimeMillis();
SearchResponse response=client.prepareSearch(query.getIndex())
// 设置查询器
.setQuery(boolQueryBuilder)
.addAggregation(useridAgg)
.setSize(0)
.get(TimeValue.timeValueMinutes(10));
[logger.info](http://logger.info/)("耗时:" + (System.currentTimeMillis() - st));
final Map<String, Map<String, UserAction>> resultMap = new HashMap<>(userNum * maxSample);
final LongTerms userCountAgg = response.getAggregations().get(Constants.USER_COUNT);
final List<LongTerms.Bucket> userBucktes = userCountAgg.getBuckets();
if(userBucktes.isEmpty()){
logger.warn("userBucktes is Empty");
}
// 解析桶中的数据
final Iterator<LongTerms.Bucket> iterator = userBucktes.iterator();
while (iterator.hasNext()){
final LongTerms.Bucket userBucket = iterator.next();
final InternalTopHits topHits = userBucket.getAggregations().get("resource_value");
final Iterator<SearchHit> it = topHits.getHits().iterator();
while(it.hasNext()){
final SearchHit searchHit = it.next();
final UserAction userAction = FastJsonUtils.toBean(searchHit.getSourceAsString(), UserAction.class);
final String userId = userAction.getUserid() + "";
Map<String, UserAction> actionMap = resultMap.get(userId);
if(actionMap == null){
actionMap = new HashMap<>();
resultMap.put(userId, actionMap);
}
actionMap.put(userAction.getDocid(), userAction);
// 达到最大样本数量
if(actionMap.size() >= maxSample){
break;
}
}
}
return resultMap;
}
4.上线效果
数据插入频率
![](https://img.haomeiwen.com/i13618762/e7c12a92b94a4506.png)
文档数量和磁盘占用情况
![](https://img.haomeiwen.com/i13618762/b241597613e8d012.png)
查询耗时
![](https://img.haomeiwen.com/i13618762/588b774ddb33411e.png)