Flink 在米哈游的落地实践
logo.png摘要:本文是来自米哈游大数据部实时计算负责人张剑对于 Flink 在米哈游应用及实践的分享。本篇内容主要分为四个部分:
- 背景介绍
- 实时平台建设
- 实时数仓和数据湖探索
- 未来发展与展望
一、背景介绍
米哈游成立于 2011 年,致力于为用户提供美好的、超出预期的产品与内容。公司陆续推出了多款高品质人气产品,包括《崩坏学园2》、《崩坏3》、《未定事件簿》、《原神》,动态桌面软件《人工桌面》以及社区产品《米游社》,并围绕原创 IP 打造了动画、漫画、音乐、小说及周边等多元产品。总部位于中国上海,并在新加坡、美国、加拿大、日本、韩国等国家和地区进行全球化布局。
Flink 在米哈游大数据发展过程中,一直扮演着重要角色。自实时计算平台建立以来,Flink 作为实时计算引擎,经历了多个发展阶段,实时计算平台也在不断地迭代完善。在米哈游内部,实时计算平台被称作 Mlink,主要以 Flink 为主,兼容 Spark Streaming 任务。从起初的 Flink Jar 包任务为主,发展到以 Flink Sql 为主,不断的降低了使用门槛和提高了任务的开发效率;从起初基础的 Flink 任务开发,发展到跨区域、跨云厂商的任务多版本管理,满足了业务发展的需求。在发展的过程中,米哈游不断地关注着社区的发展,并同社区和阿里云同学保持密切的联系。
Mlink 主要是基于 Yarn 资源管理的计算平台,支持了数仓、算法、推荐、风控、大屏等业务。任务数 1000+,Sql 任务占比 80% 左右。使用的 Yarn Vcores 超 5000 核,内存 10T 左右,其中单个任务峰值吞吐在 500 万 QPS,每天吞吐的数据规模超千亿。
二、实时平台建设
2.1 遇到的问题
在 Flink 探索发展的过程中,都会遇到 Flink 使用的一些痛点,大家遇到的,同样在我们探索和实践的过程中也有所感触。总结起来,大概是以下五个方面:
- 一是 Jar 任务的开发成本高,对于不熟悉 Flink 代码的同学来说使用成本过高。同时,Jar 任务维护成本高,一些代码逻辑的改动会涉及到重新打包、上传,上线等动作;
- 二是任务管理功能缺失,其中多租户、历史版本回溯、开发版本和线上版本管理、UDF 管理、血缘管理是实时平台管理的重要内容;
- 三是 Flink 引擎本身管理,主要涉及到多 Flink 版本管理,任务参数配置、常用 Connector 的二次开发、多资源环境管理等问题;
- 四是任务的告警监控管理,任务问题诊断;
- 五是同离线数仓互通,包括 Hive Catalog 管理,实时和离线调度依赖管理等。
上面的五个问题,可能是普遍的问题,所以各家公司都会基于内部自建或者开源项目二次开发,来满足自身任务开发管理需求。对于米哈游,除了上述五个问题,还存在跨区域、跨云厂商中遇到的问题需要解决,主要是跨区域之后,任务上线和提交效率,跨云厂商,资源环境不一致等。
2.2 解决方案
实时平台建设主要围绕如上问题。目前实时平台架构如下:
img图 1:多云多环境实时平台架构
前端控制云环境的切换。Backend Service 主要负责用户权限管理、任务的多版本管理、血缘管理,任务运维,任务上下线,任务监控和告警等工作。Executor Service 主要负责任务解析、任务提交运行、任务下线和同各类资源管理器交互等工作。其中,Backend Service 到 Executor Service 通过 Thrift 协议通信,Executor Service 的实现可以多语言扩展。架构设计主要解决跨地区跨云厂商问题,实现任务管理和任务运行之间解耦。
img图 2:Mlink 平台开发页面
img图 3:Mlink平台运维页面
img图 4:Mlink 平台同步任务页面
Mlink 实时计算平台主要设计了概览、开发、资源管理、运维、数据探查、同步任务、用户管理和执行器管理等模块。其中开发页面主要是用户编写任务和参数配置,包含历史版本管理等内容。资源管理主要是 Jar 包任务和 UDF 管理。运维主要是任务启停、任务运行监控、任务告警配置等。数据探查部分主要是预览部分数据功能,比如 Kafka Topic 支持按分区、按时间或者 Offset 预览数据。同步任务主要是为了方便管理同步任务,比如 CDC 到 Iceberg 一键同步和运行管理。执行器负责 Executor 的运维工作,包括 Executor 上下线,健康状态监控等。
2.3 遇到的挑战
平台建设和迭代过程中,我们遇到了不少的挑战,也产生了一些比较好的实践。主要分享四个方面。
第一是 Executor Service 开发和维护方面。
Executor 主要涉及到 Jar 和 Sql 任务解析提交部分。一开始的方案为了解决跨地区传输效率问题,特别是大的 jar 包传输,由后端进行任务解析,最后传输 job graph 到 Executor,Executor 再通过资源管理器 Api 提交,这个因为后端解析环境不一致问题,部分任务解析过程中会存在 action 动作,特别是涉及到 Hive 表和 Iceberg 表部分。最后采用后端不执行,改由 Executor 解析的方案。Executor 在解析过程中,遇到了 Executor 在运行很长一段时间后,会出现元空间 OOM 的情况。这个主要是因为 Executor 不断的加载任务需要 Class 类,会导致使用的元空间内存不断增加。这个主要是通过任务解析完成之后,卸载类加载器和堆 GC 设置来解决。
第二是监控方面。
监控采用的是 Influxdb 加 Grafana 的方案。随着任务量的不断增加,Influxdb 存储的 Series 超过百万,影响监控查看的稳定性,查询响应缓慢。一是扩展 Influxdb,执行端通过一致性 hash 的方案,分配任务 Metric 上报到不同 Influxdb。本身通过对 Flink 任务上报 Metric 进行一定程度的精简。其次在监控上,比如 Kafka 消费监控,目前是支持消费条数的延迟监控,自定义了 Kafka 消费延迟时间的监控,主要是采集了 Kafka 最慢并行度消费的时间,能够反映 Kafka 消费的最大延迟时间,能够反映某个时间点的数据一定被消费了。
img图 5:Grafana 监控示例
第三是 Connector 二次开发方面。
在 CDC 1.0 版本基础上迭代,支持 Mysql 采集的时候动态扩展字段和基于时间启动消费位点、采集的库表、位点等 Schema 信息。在 CDC 2.0 版本基础上,增加了全量读取库表流控和不需要 MySQL 开启 Binlog 的全量初始化功能。其中多 CDC 实例同步可能会对上游 Mysql 造成压力,采用了 Kafka 作为数据中转,根据库表主键字段作为 Topic 的 Key,保证 Binlog 的顺序,在下游不会出现数据乱序。
Iceberg 作为数据湖方案,改造的点主要是 Iceberg V2 表的支持上面,也就是 Upsert 表。建立 Iceberg 管理中心,会根据合并策略定期优化和清理,Flink 写入主要保证在 CDC 到 Iceberg V2 表顺序性,在如何减少 Delete File 上,在 Iceberg 写入上增加了 BloomFilter 的支持,能够显著减少 Delete File 大小。Iceberg 管理中心,支持了 V2 表合并和 Flink 提交冲突问题。
Clickhouse 方面,重构了 Clickhouse 写入代码,优化了 Clickhouse 的写入性能,支持了本地表和分布式表写入。
第四是数据入湖和离线调度方面。
实时平台集成了 Iceberg,并支持 Iceberg Hadoop、Hive、Oss、S3 多种 Catalog。CDC 到 Iceberg 入湖链路已经在部门生产业务上线使用。在数据入湖或者入仓中,如果下游表有被离线数仓用到的地方,都会有依赖调度问题,离线任务何时启动?目前我们主要通过计算任务的延迟时间和 Checkpoint 时间来确保数据已经入仓入湖。以 CDC 或者 Kafka 到 Iceberg 为例。首先采集 CDC 端采集延迟时间,Kafka 采集最慢并行度延迟时间,同时采集任务 Checkpoint 时间。现在的 Checkpoint 完成,Iceberg 版本不一定会更新,基于此,对 Iceberg 写入进行了改造。这样一个同步任务,如果 CDC 采集端没有延迟,Checkpoint 也已经完成,可以保证某个小时的数据一定已经入仓。实时平台提供任务延迟查询接口。离线调度以此接口为调度依赖节点。这样就保证了离线任务启动时候,入仓数据的完整性。
三、实时数仓和数据湖探索
实时数据采集,目前主要是三条链路:
- 一是日志类型,主要是通过 Filebeat 采集写入 Kafka,Es 作为 Filebeat 的监控;
- 二是 Api 接口上报服务,后端接入 Kafka;
- 三是 CDC 采集全量加增量 Mysql 数据,写入 Kafka 或者直接写入 Iceberg。之前是采用 Canal 作为增量采集方案,现在已经全部改为了 CDC。
实时数仓架构设计和业内基本一致,包括 ODS、DWD、DWS 层,之后输出到各应用系统,比如 Clickhouse、Doris、Mysql、Redis 等。目前主要以 Kafka 作为中间承载,也在探索 Iceberg 作为中间层的使用。Iceberg 虽然具有流读功能,但是流读时候数据的顺序性问题,一直没有较好的方案解决,我们也是在探索过程中。探索的主要方向有两个:
- 一是将 Kafka 和 Iceberg 作为混合 Source 方案,Flink 任务读取混合 Source 之后,基于 Iceberg 快照记录的 Kafka 位点,确定读取范围和切换点;
- 二是社区 Flip-188 提出的引入 Dynamic Table 存储实现。Flink 内置表由两部分组成,LogStore 和 FileStore。LogStore 将满足消息系统的需要,而 FileStore 是列式格式文件系统。在每个时间点,LogStore 和 FileStore 都会为最新写入的数据存储完全相同的数据 (LogStore 有 TTL),但物理布局不同。
在实时数仓探索方面,主要是 CDC 到 Iceberg 入湖任务,已经在生产上使用。其中主要解决了四个问题:
- 一是 CDC 采集问题,特别是多库多表采集,会集中采集到 Kafka,减少多个 CDC 任务对同一数据库影响;
- 二是 Iceberg 支持 V2 表写入,包括写入的索引过滤减少 Delete 文件,Iceberg 管理中心合并和提交冲突;
- 三是支持分库分表的数据校验和数据延迟检查;
- 四是一键式任务生成。对于用户而言,只需要填写数据库相关信息,目标 Iceberg 表库名和表名,并支持使用 Kafka 中转,避免多个 CDC 实例采集同一个数据库实例。
通过上述四个问题的解决,能够达到数据库数据分钟级数据入湖,入湖的数据校验和数据延迟依赖达成,方便下游离线任务调度启动。
img图 6:数据入湖链路
四、未来发展与展望
主要有四点:
- 一是 Flink 动态表存储能够尽快实现落地,实现真正的实时数仓和流表一体;
- 二是 Flink 任务动态扩缩容、基于任务诊断的主动资源调整、细粒度资源调整;
- 三是 Flink 对批任务的读写优化,目前批任务 Flink 的使用面不如 Spark,如果未来能够在此补足,可以做到流批操作一个引擎,开发成本会显著降低;
- 四是 Flink 加数据湖更好的落地推广。