机器学习平台

《离线Metrics》模块设计

2019-08-20  本文已影响0人  王勇1024

背景

离线Metrics是Alpha平台中很重要的一个功能,通过根据筛选条件选择一些样本数据,实时离线预测训练程序对样本数据的评分,从而可以让算法同学能够及时知晓训练效果。

之前由于项目比较紧急,且样本数据量较少,临时采用Redis作为样本数据的存储容器。每次筛选样本时,从Redis中读取全量数据,再遍历所有的样本数据,从中筛选出符合条件的样本。但随着样本数据的快速增多,这种方式的性能表现出现明显瓶颈,筛选样本耗时上升到了10分钟以上。

性能低下的根本原因就在于使用了不合适的存储容器,为了解决样本筛选的性能问题,首要操作就是找到合适的存储容器。通过调研,我们最终选择ES作为存储容器,原因如下:

  1. ES性能优越
  2. ELK技术栈使用简单方便

1.环境说明

Kibana

http://xxxx:5601/app/kibana#/management/kibana/index?_g=()

http://yyyy:5601/app/kibana#/management/kibana/index?_g=()

ElasticSearch

安装目录

/opt/elk/es/elasticsearch-6.3.0

Logstash

安装目录

/opt/elk/logstash-6.3.0

2.Kafka导入ES

创建配置文件

cd /opt/elk/logstash-6.3.0

vim lightmetrics.conf

input{

kafka {

bootstrap_servers => ["10.120.14.1:9092,10.120.14.2:9092,10.120.14.3:9092,10.120.14.4:9092"]

group_id => "light_metrics_indata_bi_light_cjv_pair_online_online"

topics => ["indata_str_light_metrics"]

consumer_threads => 5

decorate_events => true

codec => json

client_id => "logstash_light_metrics"

tags => ["light_metrics"]

}

kafka {

bootstrap_servers => ["10.120.14.1:9092,10.120.14.2:9092,10.120.14.3:9092,10.120.14.4:9092"]

group_id => "random_bucket_indata_bi_light_cjv_pair_online_online"

topics => ["indata_str_light_random_bucket"]

consumer_threads => 5

decorate_events => true

codec => json

client_id => "logstash_random_bucket"

tags => ["random_bucket"]

}

kafka {

bootstrap_servers => ["10.120.14.1:9092,10.120.14.2:9092,10.120.14.3:9092,10.120.14.4:9092"]

group_id => "bench_mark_indata_bi_light_cjv_pair_online_online"

topics => ["indata_str_light_bench_mark"]

consumer_threads => 5

decorate_events => true

codec => json

client_id => "logstash_benchmark"

tags => ["bench_mark"]

}

}

output {

if "light_metrics" in [tags] {

elasticsearch {

hosts => ["10.120.195.25:9200","10.120.195.26:9200","10.120.195.27:9200","10.120.195.28:9200","10.120.195.29:9200"]

index => "light_metrics"

}

}

else if "random_bucket" in [tags] {

elasticsearch {

hosts => ["10.120.195.25:9200","10.120.195.26:9200","10.120.195.27:9200","10.120.195.28:9200","10.120.195.29:9200"]

index => "random_bucket"

}

}

else if "bench_mark" in [tags] {

elasticsearch {

hosts => ["10.120.195.25:9200","10.120.195.26:9200","10.120.195.27:9200","10.120.195.28:9200","10.120.195.29:9200"]

index => "bench_mark"

}

}

else if [type] == "news" {

stdout { codec => rubydebug }

}

}

启动logstash

nohup ./bin/logstash -f lightmetrics.conf --path.data=/opt/elk/lightmetrics --path.logs=/opt/elk/lightmetrics/logs/ >/dev/null 2>&1 &

日志目录

/opt/elk/lightmetrics/logs/logstash-plain.log

3.ES查询实现

Maven依赖


<!-- ElasticSearch -->

<dependency>

<groupId>org.elasticsearch.client</groupId>

<artifactId>transport</artifactId>

<version>6.4.2</version>

</dependency>

<dependency>

<groupId>org.elasticsearch</groupId>

<artifactId>elasticsearch</artifactId>

<version>6.4.2</version>

</dependency>

代码实现


import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;

import org.elasticsearch.action.get.GetResponse;

import org.elasticsearch.action.search.SearchResponse;

import org.elasticsearch.client.IndicesAdminClient;

import org.elasticsearch.client.transport.TransportClient;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.transport.TransportAddress;

import org.elasticsearch.common.unit.TimeValue;

import org.elasticsearch.index.query.BoolQueryBuilder;

import org.elasticsearch.index.query.QueryBuilder;

import org.elasticsearch.index.query.QueryBuilders;

import org.elasticsearch.script.Script;

import org.elasticsearch.search.SearchHit;

import org.elasticsearch.search.aggregations.AggregationBuilders;

import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;

import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;

import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits;

import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsAggregationBuilder;

import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;

import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder;

import org.elasticsearch.transport.client.PreBuiltTransportClient;

import [java.net](http://java.net/).InetAddress;

import java.util.*;

static {

 try {

// 连接ES集群

Settings settings = Settings.builder().put("[cluster.name](http://cluster.name/)", "LogCollections").build();

 client = new PreBuiltTransportClient(settings)

.addTransportAddresses(new TransportAddress(InetAddress.getByName(Constants.ES_HOST_25), 9300),

new TransportAddress(InetAddress.getByName(Constants.ES_HOST_26), 9300),

new TransportAddress(InetAddress.getByName(Constants.ES_HOST_27), 9300),

new TransportAddress(InetAddress.getByName(Constants.ES_HOST_28), 9300),

new TransportAddress(InetAddress.getByName(Constants.ES_HOST_29), 9300));

 }catch (Exception e){

e.printStackTrace();

 }

}

public static Map<String, Map<String, UserAction>> query(DatasetQuery query) throws Exception {

[logger.info](http://logger.info/)(FastJsonUtils.toJSONString(query));

final String appId = query.getAppId();

final Integer batch = query.getBatch();

final String ctype = query.getCtype();

final Date end = query.getEnd();

final int maxSample = query.getMaxSample();

final int minSample = query.getMinSample();

final Date start = query.getStart();

final int userNum = query.getUserNum();

final BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

final List<QueryBuilder> filter = boolQueryBuilder.filter();

// final List<QueryBuilder> must = boolQueryBuilder.must();

// filter.add(must);

//构造查询对象

if(StringUtils.isNotBlank(appId)) {

final QueryBuilder appidQuery = QueryBuilders.queryStringQuery(appId).field(Constants.APPID);

filter.add(appidQuery);

}

if(batch != null) {

final QueryBuilder batchQuery = QueryBuilders.termQuery(Constants.BATCH, batch);

filter.add(batchQuery);

}

if(StringUtils.isNotBlank(ctype)){

final QueryBuilder ctypeQuery= QueryBuilders.queryStringQuery(ctype).field(Constants.TYPE);

filter.add(ctypeQuery);

}

// 根据时间范围查找

if(start != null && end != null){

final QueryBuilder timestampQuery = QueryBuilders.rangeQuery(Constants.TIMESTAMP).

from(start.getTime()).to(end.getTime());

filter.add(timestampQuery);

}

// 用户-文档映射关系查询

final TermsAggregationBuilder useridAgg = AggregationBuilders.terms(Constants.USER_COUNT).field(Constants.USERID)

.size(Integer.MAX_VALUE);

// 筛选字段

final TopHitsAggregationBuilder topHitsAgg = AggregationBuilders.topHits("resource_value");

topHitsAgg.storedFields();

topHitsAgg.size(maxSample);

useridAgg.subAggregation(topHitsAgg);

// 设置最少样本数量

useridAgg.minDocCount(minSample);

// 设置bucket的大小

useridAgg.size(userNum);

//搜索结果存入SearchResponse

long st = System.currentTimeMillis();

SearchResponse response=client.prepareSearch(query.getIndex())

// 设置查询器

.setQuery(boolQueryBuilder)

.addAggregation(useridAgg)

.setSize(0)

.get(TimeValue.timeValueMinutes(10));

[logger.info](http://logger.info/)("耗时:" + (System.currentTimeMillis() - st));

final Map<String, Map<String, UserAction>> resultMap = new HashMap<>(userNum * maxSample);

final LongTerms userCountAgg = response.getAggregations().get(Constants.USER_COUNT);

final List<LongTerms.Bucket> userBucktes = userCountAgg.getBuckets();

if(userBucktes.isEmpty()){

logger.warn("userBucktes is Empty");

}

// 解析桶中的数据

final Iterator<LongTerms.Bucket> iterator = userBucktes.iterator();

while (iterator.hasNext()){

final LongTerms.Bucket userBucket = iterator.next();

final InternalTopHits topHits = userBucket.getAggregations().get("resource_value");

final Iterator<SearchHit> it = topHits.getHits().iterator();

while(it.hasNext()){

final SearchHit searchHit = it.next();

final UserAction userAction = FastJsonUtils.toBean(searchHit.getSourceAsString(), UserAction.class);

final String userId = userAction.getUserid() + "";

Map<String, UserAction> actionMap = resultMap.get(userId);

if(actionMap == null){

actionMap = new HashMap<>();

resultMap.put(userId, actionMap);

}

actionMap.put(userAction.getDocid(), userAction);

// 达到最大样本数量

if(actionMap.size() >= maxSample){

break;

}

}

}

return resultMap;

}

4.上线效果

数据插入频率

image.png

文档数量和磁盘占用情况

image.png

查询耗时

image.png
上一篇 下一篇

猜你喜欢

热点阅读