python机器学习爬虫

Flink 在快手实时多维分析场景的应用

2020-06-23  本文已影响0人  阿里云技术

作者:董亭亭、徐明

摘要:作为短视频分享跟直播的平台,快手有诸多业务场景应用了 Flink,包括短视频、直播的质量监控、用户增长分析、实时数据处理、直播 CDN 调度等。此次主要介绍在快手使用 Flink 在实时多维分析场景的应用与优化。主要内容包括:

  1. Flink 在快手应用场景及规模
  2. 快手实时多维分析平台
  3. SlimBase-更省 IO、嵌入式共享 state 存储

Tips:点击下方链接可查看作者原版PPT及分享视频~
https://ververica.cn/developers/flink-forward-asia-2019/

Flink 在快手应用场景及规模

首先看 Flink 在快手的应用场景和规模。

1. 快手应用场景

640 1.png

快手计算链路是从 DB/Binlog 以及 WebService Log 实时入到 Kafka 中,然后接入 Flink 做实时计算,其中包括实时数仓、实时分析以及实时训练,最后的结果存到 Druid、Kudu、HBase 或者 ClickHouse 里面;同时 Kafka 数据实时 Dump 一份到 Hadoop 集群,然后通过 Hive、MapReduce 或者 Spark 来做离线计算;最终实时计算和离线计算的结果数据会用内部自研 BI 工具 KwaiBI 来展现出来。

640 2.png

Flink 在快手典型的应用场景主要分为三大类:

640 3.png

Flink 在快手应用的典型场景案例包括:

2. Flink 集群规模

640 4.png

快手目前集群规模有 1500 台左右,日处理条目数总共有3万亿,峰值处理条目数大约是 3亿/s 左右。集群部署都是 On Yarn 模式,实时集群和离线集群混合部署,通过 Yarn 标签进行物理隔离,实时集群是 Flink 专用集群,针对隔离性、稳定性要求极高的业务部署。注:本文所涉及数据仅代表嘉宾分享时的数据。

快手实时多维分析平台

此处重点和大家分享下快手的实时多维分析平台。

1. 快手实时多维分析场景

640 5.png

快手内部有这样的应用场景,每天的数据量在百亿级别,业务方需要在数据中任选五个以内的维度组合进行全维的建模进而计算累计的 PV ( Page View 访问量 )、UV ( Unique Visitor 独立访客 )、新增或者留存等这样的指标,然后指标的计算结果要实时进行图形化报表展示供给业务分析人员进行分析。

2. 方案选型

640 6.png

现在社区已经有一些 OLAP 实时分析的工具,像 Druid 和 ClickHouse;目前快手采用的是 Flink+Kudu 的方案,在前期调研阶段对这三种方案从计算能力、分组聚合能力、查询并发以及查询延迟四个方面结合实时多维查询业务场景进行对比分析:

640 7.png

采用 Flink+Kudu 的方案主要思想是借鉴了 Kylin 的思路,Kylin 可以指定很多维度和指标进行离线的预计算然后将预计算结果存储到 Hbase 中;快手的方案是通过 Flink 实时计算指标,再实时地写到 Kudu 里面。

3. 方案设计

640 8.png

实时多维分析的整体的流程为:

接下来详细介绍一下实时多维分析的主要模块。

■ 数据预处理

640 9.png

KwaiBI 配置维度建模时选择的数据表,是经过提前预处理的:

■ 建模计算指标

640 10.png

数据预处理完成后,最重要的步骤是进行建模指标计算,此处支持 Cube、GroupingSet 方式维度组合来计算小时或者天累计的 UV ( Unique Visitor )、新增和留存等指标,可以根据用户配置按固定时间间隔定期输出结果;维度聚合逻辑中,通过逐层降维计算的方式会让 DAG 作业图十分复杂,如上图右上角模型所示;因此快手设计了两层降维计算模型,分为全维度层和剩余维度层,这样既利用了全维度层的聚合结果又简化了 DAG 作业图。

640 11.png

以 UV 类指标计算举例,两个黄色虚线框分别对应两层计算模块:全维计算和降维计算。

640 12.png

再重点介绍下,建模指标计算中的几个关键点。在建模指标计算中,为了避免维度数据倾斜问题,通过预聚合 ( 相同维度 hash 打散 ) 和全量聚合 ( 相同维度打散后聚合 ) 两种方式来解决。

为了解决 UV 精确去重问题,前文有提到,使用 Bitmap 进行精确去重,通过字典服务将 String 类型数据转换成 Long 类型数据进而便于存储到 Bitmap 中,因为统计 UV 要统计历史的数据,比如说按天累计,随着时间的推移,Bitmap 会越来越大,在 Rocksdb 状态存储下,读写过大的 KV 会比较耗性能,所以内部自定义了一个 BitmapState,将 Bitmap 进行分块存储,一个 blockid 对应一个局部的 bitmap,这样在 RocksDB 中存储时,一个 KV 会比较小,更新的时候也只需要根据 blockid 更新局部的 bitmap 就可以而不需要全量更新。

640 13.png

接下来,看新增类的指标计算,和刚刚 UV 的不同点是需要判断是否为新增用户,通过异步地访问外部的历史用户服务进行新增用户判断,再根据新增用户流计算新增 UV,这块计算逻辑和 UV 计算一致。

640 14.png

然后,再来看留存类指标计算,与 UV 计算不同的时候,不仅需要当天的数据还需要前一天的历史数据,这样才能计算出留存率,内部实现的时候是采用双 buffer state 存储,在计算的时候将双 buffer 数据相除就可以计算出留存率。

■ Kudu 存储

640 15.png

最后经过上面的计算逻辑后,会将结果存储到 Kudu 里面,其本身具有低延迟随机读写以及快速列扫描等特点,很适合实时交互分析场景;在存储方式上,首先对维度进行编码,然后按时间+维度组合+维度值组合作为主键,最终按维度组合、维度值组合、时间进行分区,这样有利于提高查询的效率快速获取到数据。

4. KwaiBI 展示

640 16.png

界面为配置 Cube 模型的截图,配置一些列并指定类型,再通过一个 SQL 语句来描述指标计算的逻辑,最终结果也会通过 KwaiBI 展示出来。

SlimBase-更省 IO、嵌入式共享 state 存储

接下来介绍一种比 RocksDB 更省 IO、嵌入式的共享 state 存储引擎:SlimBase。

1. 面临的挑战

640 17.png

首先看一下 Flink 使用 RocksDB 遇到的问题,先阐述一下快手的应用场景、广告展现点击流实时 Join 场景:打开快手 App 可能会收到广告服务推荐的广告视频,用户可能会点击展现的广告视频。

这样的行为在后端会形成两份数据流,一份是广告展现日志,一份是客户端点击日志。这两份数据进行实时 Join,并将 Join 结果作为样本数据用于模型训练,训练出的模型会被推送到线上的广告服务。

该场景下展现以后20分钟的点击被认为是有效点击,实时 Join 逻辑则是点击数据 Join 过去20分钟内的展现。其中,展现流的数据量相对比较大,20分钟数据在 1TB 以上。检查点设置为五分钟,Backend 选择 RocksDB。

640 18.png

在这样的场景下,面临着磁盘 IO 开销70%,其中50%开销来自于 Compaction;在 Checkpoint 期间,磁盘 IO 开销达到了100%,耗时在1~5分钟,甚至会长于 Checkpoint 间隔,业务能明显感觉到反压。经过分析找出问题:

2. 解决方案

640 19.png

由于出现上文阐述的问题,开始寻找解决方案,整体思路是在数据写入时直接落地到共享存储中,避免 Checkpoint 带来的数据拷贝问题。手段是尝试使用更省 IO 的 Compaction,例如使用 SizeTieredCompation 方式,或者利用时序数据的特点使用并改造 FIFOCompaction。综合比较共享存储、SizeTieredCompation、基于事件时间的 FIFOCompaction 以及技术栈四个方面得出共识:HBase 代替 RocksDB 方案。

640 20.png

但是 HBase 有些方面相比 RocksDB 较差:

综合上面几点原因,快手达成了第二个共识,将 HBase 瘦身,改造为嵌入式共享存储系统。

3. 实现方案

640 21.png

接下来介绍一下将 HBase 改造成 SlimBase 的实现方案,主要是分为两层:

后面将从 HBase 瘦身、适配并实现操作接口以及实现 SlimBaseStateBackend 三个步骤分别进行详细介绍。

■ HBase 瘦身

640 22.png

先讲 HBase 瘦身,主要从减肥和增瘦两个步骤,在减肥方面:

在增瘦方面:

640 23.png

接口层主要有以下三点实现:

适配层主要有以下两个概念:

640 24.png

SlimBaseStateBackend 实现上主要体现在两个方面:

4. 测试结论

640 25.png

上线对比测试后,得出测试结论:

5. 后期优化

640 26.png

目前用的 Compaction 策略是 SizeTieredCompaction,后期要实现基于 OldestUnexpiredTime 的 FiFOCompaction 策略,目标是做到无磁盘 IO 开销。

FiFOCompaction 是一种基于 TTL 的无 IO 的 Compaction 策略;OldestUnexpiredTime 是指例如设置 OldestUnexpiredTime=t2,表示 t2 时刻前的数据全部过期,可以被 Compaction 清理,基于时间点的 FIFOCompaction 理论上可以做到无磁盘 IO 开销。

640 27.png

后续还有四点优化,前三点是基于 HBase 的优化,最后是针对 HDFS 做的优化:

6. 未来规划

640 28.png

从语言、存储、压缩策略、事件事件下推、垃圾回收、检查点时间、重加载时间七个方面来看,SlimBase 都比 RocksDB 更适合快手实时计算任务的开发,未来的规划是对 Slimbase 的性能做进一步优化,愿景是将快手 Flink 上的所有业务场景全部用 SlimBase 替代掉 RocksDB。

作者介绍:

董亭亭,快手大数据架构团队,实时计算引擎团队负责人。目前负责 Flink 引擎在快手公司内的研发和应用实践。2013 年毕业于大连理工大学,曾就职于奇虎360,58集团,接触过的领域包括:分布式计算、调度、分布式存储等。

徐明,快手大数据架构研发工程师。毕业于南开大学,目前在快手数据架构团队,负责 HBase 引擎及周边生态维护和研发。

查看更多:https://yqh.aliyun.com/detail/14987?utm_content=g_1000144995

上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/

上一篇下一篇

猜你喜欢

热点阅读