马蜂窝实时计算平台演进之路
MES 是马蜂窝统一实时计算平台,为各条业务线提供稳定、高效的实时数据计算和查询服务。在整体设计方面,MES 借鉴了 Lambda 架构的思想。本篇文章,我们将从四个方面了解 MES:
1. 关于 Lambda 架构
2.MES 架构和原理
3.MES 优化历程
4. 近期规划
关于 Lambda 架构
Lambda 架构是由 Storm 作者 NathanMarz 根据自己在 Twitter 的分布式数据处理系统经验,提出的一个实时大数据处理框架,具有高容错、低延时和可扩展等特性。
Lambda 架构核心的思想主要可以归纳成两点:
(1)数据从上游 MQ 消息中间件过来后分为 2 路,一路离线批处理, 一路实时处理并有各自的 View 以供查询。
(2)Query 时,对数据做 Function, 结合 Batch View 和 Realtime View,得到最终结果。
具体来说,Lambda 架构将大数据系统架构为多个层次:批处理层(Batch layer)、实时处理层(Speed Layer)、服务层(Serving Layer)。
我们结合一张经典的 Lambda 架构图分别来看:
图 1:Lambda 架构
(图片来源:网络)
批处理层(Batch Layer):批处理层承担的任务是对从上游进来的所有被系统认为不可变的原始数据。类比目前的数据平台架构来看, 即离线的那几张保存原始数据的主表。这 3 张主表是所有完整的数据并且是不可变的,基于这几张主表,数据经过 Batch 、ETL,产生供批处理查询的 Batch View。
加速层(Speed Layer):批处理层虽然可以很好地处理离线数据,但它不能很好满足对于时间粒度的需求。对于需要不断实时生成和实时查询处理的数据,通常会放在加速层来进行实时处理和转化。
加速层与批处理层最大的区别在于,加速层只处理最近的数据,而批处理层处理所有数据。另外在数据的读取方面,为了满足最小延迟,加速层不会在同一数据读取所有新数据,而是在收到新数据时更新 Realtime View,所以我们说,在加速层进行的是一种增量的计算。
服务层(Serving Layer):服务层用于响应用户的查询请求,合并 Batch View 和 Realtime View 中的结果数据集到最终的数据集,并向外对用户通过统一接口,提供实时+离线的数据统计服务。
基于 Lambda 的数据平台架构, 可以按照分层集成众多的大数据组件。在对 MES 的架构设计中,我们借鉴了 Lambda 架构的思想来实现更快
更准、鲁棒性更好的特性。
马蜂窝实时计算平台 MES
为了保证 MES 实时计算平台的性能,我们结合马蜂窝的实际业务场景,主要围绕低延迟,高吞吐、容灾能力和 Exacty Once 的流式语义这四点,来进行架构设计和技术选型。
整体架构设计
对照 Lambda 架构,我们选用 Kafka 作为消息中间件,批处理层选择 Hive、Presto,加速层也就是实时处理层选择 Spark、Flink 等。
图 2:MES 整体架构图
数据从 Kafka 出来后走两条线,一条是 Spark Streaming,支持秒级别的实时数据,计算结果会入库到 Redis 里。第二天凌晨,Redis 中前一天的所有数据 Batch 到 HBase 中;
另外一条是 Flink+Druid,用来处理分钟级和小时级的数据;
上面提供一层 Restful API / Thrift API 封装,供 MES 页面或其他业务通过接口的方式来获取数据;
如果实时数据出了问题,我们会通过 HDFS 中的离线主表进行重算,也是有两条路径:
一是为用户服务的 MES 重算系统,用户可以自助化选取重算规则,提交重算任务。这个任务会被提交到 PrestoSQL 集群,计算结果最终落地到 HBase 里,重算后 MES 的历史数据就会和离线数据算出来的数据保持一致;
另外一条线是 Spark 全量重算,由数据平台的小伙伴内部使用,解决的是基于所有事件组、所有规则的全天数据重算。Spark 会读取配置规则,重算所有前一天的数据后入库到 HBase,保持实时数据和离线数据的一致性;
监控系统是 Grafana,它开放了通用接口给 Python、Java 等语言来上报相关信息,只要按照接口上报要想关注的指标并进行简单配置,就可以查询结果,比如 MES 的延迟时间、一些 Restful 接口的调用量等, 如果出现不正常的情况将通过邮件告警;
最右边是贯穿始终的 MES 规则,我们可以抽象地把它看作是实时的配置流。
MES 实时计算引擎
1. 技术选型
结合马蜂窝的业务需求,我们对三大主流实时计算引擎 Storm、Spark Streaming、Flink 进行了选型对比。
Storm
Storm 是第一代流式计算引擎,实现了一个数据流 (Data Flow) 的模型。我们可以把它想象成一个发射点,一条一条产生数据,形成的数据流分布式地在集群上按照 Bolt 的计算逻辑进行转换,完成计算、过滤等操作,在下游实现聚合。
Storm 的优势是实时性好,可以达到毫秒级。但是它的吞吐量欠佳,并且只能为消息提供「至少一次」的处理机制, 这意味着可以保证每条消息都能被处理,但也可能发生重复。
Spark Streaming
Spark Streaming 不像 Storm 那样一次一个地处理数据流,而是在处理前按时间间隔预先将其切分为一段一段,进行「微批次」处理作业。这样一来解决了吞吐量的问题,但它的实时性就没有 Storm 那么高,不过也可以达到秒级处理。
在流式语义方面,由于 Spark Streaming 容错机制基于 RDD,依靠 CheckPoint,出错之后会从该位置重新计算,不会导致重复计算。当然我们也可以自己来管理 offset,保证 Exactly Once (只算一次的语义) 的处理。
Flink
Flink 是新一代流式计算引擎,国内的阿里就是 Flink 的重度使用和贡献者。Flink 是原生的流处理系统,把所有的数据都看成是流,认为批处理是流处理中的一种特殊情况。数据基于 Flink Stream Source 流入,中间经过 Operator,从 Sink 流出。
为了解决流处理的容错问题,Flink 巧妙地运用了分布式快照的设计与可部分重发的数据源实现容错。用户可自定义对整个 Job 进行快照的时间间隔。当任务失败时,Flink 会将整个 Job 恢复到最近一次快照,并从数据源重发快照之后的数据。Flink 同时保证了实时性和吞吐量,流式语义也做得非常好,能够保证 Exactly Once。
在此之外,组件技术选型的时候在满足自己业务现状的同时, 还需要从以前几个方面考虑:
| 开源组件是否能覆盖需求
| 开源组件的扩展性和二次开发的难度
| 开源组件 API 是否稳定
| 开源组件是否有应用于生产环境的案例,比如多少公司应用于生产环境
| 开源组件社区是否活跃,比如可以看 github,issues,jiras 这些活跃程度
| 开源组件 License 限定问题
| 开源组件之间的耦合问题
2. 设计
下图描述了 MES 实时计算引擎处理数据的过程:
图 3:MES Streaming
数据从 Kafka 源源不断地过来形成数据流,用户通过 UI 配置的一些规则形成实时配置流,数据流和配置流进入到实时计算引擎 Spark Streaming 后进行聚合计算。计算出的实时数据写入到 Redis,历史数据入库到 HBase。UI 目前通过 Restful API 来获取实时和历史数据。
3. 演进
关于 MES 实时计算的引擎,我们主要经历了两次演进。
第一代 :Spark Streaming + Redis + HBase
在设计第一代 MES 时,我们希望可以支持秒级的计算,并且精确计算每一个用户。所以在当时的现状下,我们综合考虑选择了 Spark Streaming。
这个方案计算出来的 UV 是比较精确的。但它有自己的局限性:
首先,这一套架构用到的几个组件其实对资源都比较依赖, 而且 SparkStreaming 对那种时不时的流量高峰的数据处理不是非常友好。数据先在 Spark Streaming 算好然后再入 Redis,最后再入库到 Hbase,数据链路比较长,不好维护。
另外,第一代 MES 只支持自助配置规则,有了规则才会实时计算。所以对于比较自由的 OLAP 交叉分析不友好。而且如果由于集群不稳定等原因导致的任务失败数据少算, 那么不管是用户自助提交 Presto 还是利用 Spark 批处理全量重算,都是一个消耗集群资源的过程。由于批处理重算需要一定的时间来完成对历史数据的修复,这对一些需要数据准确并及时提供的用户不是非常友好。
我们考虑,在数据量大的情况下,我们是不是可以适当牺牲 UV 精准度的计算,来保障整个系统的性能和稳定性。所以就引入了 Flink + Druid。
第二代:引入 Flink + Druid
刚才我们已经简单了解过 Flink,再来说下 Druid。
Druid 是一个大数据实时查询和分析的高容错、高性能的开源分布式系统,用来快速处理大规模的数据,它能够实现对大量数据的快速查询和分析,不足是存在一个 2% 的误差。但事实上,在数据量非常大的情况下,2% 的误差是可以接受的。后面我们又通过 Yahoo 提供的 Data Sketch,实现 UV 计算的精确调控,可以实现在 800w 之下的数据量,UV 都是准确的。最终的计算结果通过 restful 接口提供给 MES 获取数据并展现。
图 4:关于 Druid
Flink + Druid 部分主要是用来处理数据量大、维度多,且不需要精确到秒级的业务数据,比如 Page logdata、mobile page、以及 Server Push。在最近 15 天的数据是可以精确到分钟级别查询的,对于历史数据,粒度越精确,持久化到 Druid 里面的数据量就越大。
在离线批量导入部分,目前 Druid 支持小时级以及 T+1 天级的数据校正。因为如果在 Flink +Tranquility 实时摄取数据这一阶段 task 有异常的话,会导致实时数据到 Druid 有丢失的情况出现。因此根据 Lambda 架构的思想,我们可以用小时级或者天级离线数据对丢失的数据进行重算补全。
对比一下两代计算引擎,Flink + Druid 的引入很好地解决了因为大量数据的 UV 计算带来的压力:
图 5:两代实时计算引擎
MES 优化历程
为了更好地满足业务需求,提升整个系统的性能,我们也在不断对 MES 进行优化,主要包括实时计算集群、计算引擎、查询接口和监控方面。这里主要和大家分享两点。
1. 实时计算集群优化
| Spark,Druid,Flink 集群框架版本升级及相关参数优化;
| Redis,Hbase 节点扩容和参数优化;
| 集群网络,Yarn,Mesos 等资源管理框架调整和优化
2.实时计算引擎优化
数据结构和计算逻辑
对于 Spark 来讲,Prefer 原生数据类型以及数组结构,对于指针类型以及嵌套的结构处理起来性能不是非常友好。因此要注意这一点,妥善优化自己的数据结构。
计算逻辑的部分也要考虑好。比如写 Redis 的时候是事先规划好要存入 Redis 中的数据结构来利用 Akka 并发每条来写入,还是在 Streaming 中算好一批结果最后来一次性写入 Redis,这 2 种方式在性能上还是有很大区别的。
参数优化
(1) 序列化方式首先是 Kyro 的方式,其次才是 Java,序列化的方式不同对网络的传输以及处理起来的性能是有影响的。
(2)Spark 推测执行机制。根据我们集群目前的现状,有各种各样的任务同时在跑,如果遇到集群资源使用高峰期,那么一个 Spark 任务落在比较慢的节点上就会拖累整个 Job 的执行进度。开启推测执行之后,系统会把进程慢的任务主动杀死,然后重新产生一个相同的任务分配到资源充沛的节点上去快速完成它。
(3) 数据本地化。分布式计算有一个经典的理念是:移动数据不如移动计算。比如说我把一个任务分成很多并行的任务,有可能获得的任务刚好需要处理的数据就在处理的节点上,也有可能不是。所以这里有一个本地化等待时间的参数可以控制数据本地化的处理等级并对性能产生很大影响。
另外还用一些关于并行度控制、JVM GC 方面的调优就比较细节了,如果大家感兴趣可以留言给我们交流。
未来规划
马蜂窝实时计算平台的发展还需要不断探索,未来我们主要会在以下几个方面重点推进:
1. 实时计算任务统一资源管理和任务调度
2. 支持复杂的实时 SQL OLAP 计算
3. 实时数据血缘关系及监控预警
4. 复杂实时 CEP 规则系统
感谢关注,欢迎大家扫描下方二维码订阅「马蜂窝技术」内容并推荐给更多热爱技术的朋友,希望有更多机会和大家交流。
微信关注“马蜂窝技术”公众号,阅读更多技术干货。