Flink 在 58 同城的应用与实践
Flink 在 58 同城应用与实践 FL.jpg本文整理自 58 同城实时计算平台负责人冯海涛在 Flink Forward Asia 2020 分享的议题《Flink 在 58 同城应用与实践》,内容包括:
- 实时计算平台架构
- 实时 SQL 建设
- Storm 迁移 Flink 实践
- 一站式实时计算平台
- 后续规划
一、实时计算平台架构
实时计算平台的定位是为 58 集团海量数据提供高效、稳定的实时计算一站式服务。一站式服务主要分为三个方向:
- 第一个方向是实时数据存储,主要负责为线上业务接入提供高速度的实时存储能力。
- 第二是实时数据计算,主要为海量数据的处理提供分布式计算框架。
- 第三是实时数据分发,主要负责将计算后的数据分发到后续的实时存储,供上层应用。
平台建设主要分为两个部分:
-
第一部分是基础能力建设,目前主要包括 Kafka 集群、storm 集群、 Flink 集群、SparkStreaming 集群。
-
另一部分是平台化建设,主要是包括两点:
-
第一个是数据分发,我们的数据分发是基于 Kafka Connect 打造的一个平台,目标是实现异构数据源的集成与分发。在实际使用数据场景过程中,经常需要将不同的数据源汇聚到一起进行计算分析。
传统方式可能需要针对不同的存储采用不同的数据同步方案。我们的数据分发是通过提供一套完整的架构,实现不同数据源的集成和分发。
-
第二个是我们基于 Flink 打造的一站式实时计算平台,后文会有详细的介绍。
-
上图是我们的实时计算平台的架构。
-
在实时数据接入这部分,我们采用的是 Kafka,binlog 提供 canal 和 debezium 两种方式进行接入。
-
在业务日志这部分,我们主要采用 flume 进行线上业务的 log 的采集。
-
在实时计算引擎这部分,根据开源社区发展以及用户的需求,从最早的 Storm 到后来引入 SparkStreaming,以及现在主流的 Flink。
-
在实时存储这部分,为了满足多元化的实时需求,我们支持 Kafka、Druid、Hbase、ES、ClickHouse。
-
同时在计算架构之上,我们建设了一些管理平台,比如集群管理,它主要负责集群的扩容,稳定性的管理。
-
另一个是 Nightfury,主要负责集群治理,包括数据接入、权限治理、资源管理等等。
img
我们在业务发展过程中,引入了 Flink 计算框架。首先从业务来说,58 是一个一站式生活服务平台,包含很多业务线。随着业务的发展,数据量越来越大,场景越来越丰富,需要一个更加强大的计算框架来满足用户的需求。
-
第一个场景是实时 ETL,主要是针对原始日志进行信息转化,结构化处理,运用于后续计算,需要高吞吐低延迟的计算能力。
-
第二块是实时数仓,它作为离线数仓的一个补充,主要是提升一些实时指标的时效性。第三种场景是实时监控,它需要比较灵活的时间窗口支持。
-
最后一种场景是实时数据流分析,比如说,数据乱序的处理、中间状态的管理、Exactly once 语义保障。
我们前期基于 Storm 和 SparkStreaming 构建的计算集群在很大程度上并不能满足这些场景需求。于是对 Flink 进行了调研,发现 Flink 不论是在计算性能,还是流数据特性支持上,都体现出了非常大的优势。因此,我们决定采用 Flink 作为主流的计算框架。
img上图是我们 Flink 集群的建设情况。Flink 作为实时计算框架,经常需要 7×24 小时的可用性。我们在建设底层集群的时候,需要考虑高可用的架构。
-
首先在部署模式上,主要是采用 Flink On YARN,实现集群的高可用。
-
在底层的 HDFS 上,采用 HDFS federation 机制,既可以避免离线集群的抖动对实时这边造成影响,同时也减少了维护的 HDFS 数量。
-
在集群隔离上,主要是采用 Node Labe 机制,就可以实现把重要业务运行在一些指定节点上。同时在这个基础之上,引入了 Cgroup,对 CPU 进行隔离,避免任务间的 CPU 抢占。
-
在管理层面,不同的业务提交到不同的队列进行管理,避免业务间的资源抢占。
-
在计算场景上,根据不同的计算场景,比如说计算型、IO 型,会提交到不同的节点,从而提升整个集群的资源利用率。
Flink 计算框架在 58 经历了大概两年多的发展。目前我们的集群有 900 多台机器,2000 多个实时任务,每天处理大概 2.5 万亿的实时数据,数据量峰值达到了 3000 万每秒。
二、实时 SQL 建设
1. 实时 SQL 演进
SQL 编程具有低门槛、自动优化、版本统一等特点。同时 Flink SQL 作为实时数仓的主要工具,是我们在建设 Flink 平台时考虑的一个主要方向。
我们最早上线的 Flink 是基于 1.6 版本的,当时这个版本只支持 DML,我们在当时的版本基础上进行了一些扩展,主要是在 DDL 语法上的扩展支持。在用户使用层面,为了简化 DDL 的定义,也通过一个配置化的方式来实现自动生成 DDL。在开发的时候,提供可视化开发的功能和在线调试的功能。
随着社区的开源,我们将 Flink SQL 切换到了社区版本,之后也升级相关的版本,以及合并比较多的社区版本特性,比如说 Blink 相关、批流合一、对 Hive 的支持。
最后针对 Flink SQL 这块的实时数仓,也做了一些数仓化的工作,主要包括元数据管理、血缘关系、数仓分层、权限管理等等。
img2. 存储扩展
关于存储扩展这一块,最开始我们是基于 Flink 自己实现的一套 DDL。随着社区开源,切换到社区的 Flink SQL 版本,然后在上面做了一些扩展,主要有几个方面:
- 第一,打通了主流存储和内部的实时存储。比如说,在源表上支持了内部的 wmb,它是一个分布式消息队列。在维表上支持这种 redis,内部的 wtable。在结果表上支持了 ClickHouse,redis,以及我们内部的 wtable;
- 第二,定制 format 支持。因为在实际业务中,很多数据格式并不是标准的,没法通过 DDL 来定义一个表。我们提供了一种通用的方式,可以采用一个字段来代表一条日志,让用户可以通过 udf 去自定义,并解析一条日志。
- 最后,在 source 和 sink DDL 定义基础上,增加了并发度的设置。这样用户就可以更灵活地控制任务的并发。
3. 性能优化
关于性能优化,主要是两方面:
-
第一个是对 Blink 特性的引进,Blink 提供了大量的特性,比如通过 mini batch 的处理方式,提高任务的吞吐。通过 local global 两阶段聚合,缓解数据热点问题。还有通过 emit,增强窗口的功能。把这些功能集成到我们的计算平台,用户通过一些按钮可以直接打开。
-
另一个是对异步 lO 的应用。在实时数仓化建设过程中,维表之间的关联是比较大的应用场景,经常因为维表的性能导致整个任务的吞吐不高。因此我们增加了一个异步 IO 的机制,主要有两种实现:
-
一种针对目标存储支持异步 client,直接基于异步 client 来实现。比如 MySQL 和 redis。
-
另一种不支持异步 client 的,我们就借助现成的机制来模拟,同时在这个基础之上增加了一套缓存的机制,避免所有的数据直接查询到目标存储,减少目标存储的压力。同时在缓存基础上,也增加 LRU 机制,更加灵活的控制整个缓存。
同样,数据写入这一块遇到大并发量写入的时候,尽量提高并发来解决写入性的问题,这样就会导致整个任务的 CPU 利用率比较低,所以就采用单并发度多线程的写入机制,它的实现是在 sink 算子里面增加一个 buffer,数据流入到 sink 之后会首先写入到 buffer,然后会启动多线程机制去消费这个 buffer,最终写到存储里面。
-
4. 数仓化建设
实时数仓作为 Flink 的一个比较典型的应用场景,相较于离线数仓它可能存在一些平台化不完善的方面:
-
首先,元数据管理功能不完善。
-
然后,Flink SQL 这一块,对于每个任务我们都可能需要重新定义一个数据表。并且由于数据没有分层的概念,导致任务比较独立,烟囱式开发,数据和资源使用率比较低下。
-
另外,也缺乏数据血缘信息。
为了提升实时数仓建设的效率,我们提供了面向数仓化实时 SQL 能力,在数仓设计,任务开发,平台化管理方面全面对齐离线数仓的建设模式。
img4.1 数仓化
数仓化主要是参考离线数仓的模型,对我们实时数仓这一块进行模型建设。
比如说,最原始的数据会进入ODS 层,经过一些清洗落入到行为明细层,之后会拆分到具体的主题明细层,然后再将一些相关的维表信息进行计算,再到汇总层,最终提供给最上层的应用,包括一些实时报表,Ad-hoc 查询等。
img4.2 数仓平台
实时数仓目前主要还是基于这种 Lambda 架构来进行平台化的建设。
-
首先,在元数据管理这一块,Flink 默认采用内存对元数据进行管理,我们就采用了 HiveCatalog 机制对库表进行持久化。
-
同时我们在数据库的权限管理上,借助 Hive ACL 来进行权限管理。
-
有了元数据持久化之后,就可以提供全局的元数据检索。
-
同时任务模式就可以由传统的 DDL+DML 简化为 DML。
-
最后,我们也做了血缘关系,主要是在 Flink SQL 提交过程中,自动发现 SQL 任务血缘依赖关系。
三、Storm 迁移 Flink 实践
1. Flink 与 Storm 对比
Flink 相对于 Storm 来说,有比较多的优势。
-
在数据保障上,Flink 支持 Exactly once 语义,在吞吐量、资源管理、状态管理,用户越来越多的基于 Flink 进行开发。
-
而 Storm 对用户来说,编程模型简单,开发成本高,流式计算特性缺乏,吞吐低无法满足性能。在平台侧,独立集群多、运维困难、任务缺少平台化管理、用户体验差。
因此我们决定迁移到 Flink。
img2. Flink-Storm 工具
在 Storm 迁移到 Flink 的时候,如果让用户重新基于 Flink 进行逻辑开发,可能需要比较大的工作量。因此我们对 Flink 进行了调研,发现有个 Flink-Storm 工具。它实现了将 Storm Topology 转到 Flink Topology。比如说,把 spout 转换到 Flink 的 source function,把 bolt 转换到 Transform 和 sink function。
在使用的过程中我们也发现一些问题,Flink-Storm 工具无法支持 Yarn 模式, 缺少 Storm 引擎功能,最后还有一个比较大的问题,我们的 storm 在发展过程中维护了很多版本,但是 Flink-Storm 工具只支持基于一个版本进行开发。于是,我们做了一些改进。
img3. 对 Flink-Storm 的改进
3.1 消息保障
Storm 有三个特点:
- 第一,ack 机制;
- 第二,依赖 zookeeper;
- 第三,at least once 语义保障。
我们做了四点改进:
- 第一,Flink-Storm 去掉 ack 支持;
- 第二,KafkaSpout 实现 CheckpointListener;
- 第三,KafkaSpout 实现 CheckpointedFunction;
- 第四,Flink-Storm 打开 checkpoint。
3.2 对 Storm 定时器的支持
在早期版本里面其实是没有窗口机制的,我们借助 Storm 定时机制来实现窗口计算。它的机制是这样的,Storm 引擎会定时向 bolt 里面发送一个系统信号,用户就可以通过这个系统信号进行一个切分,模拟窗口操作。
同样,Flink 也没有这样一个定时器的机制,于是我们就考虑从 Flink-Storm 层面来实现,改造了 BoltWrapper 类,它作为 bolt 类的一个封装,实现机制跟 bolt 是一样的,包括 5 点:
- 初始化 open 方式启动异步线程。
- 模拟构造 tick 的 StreamRecord;
- 调用 processeElement 函数发送 tuple;
- 频率由外部参数全局控制;
- close 中关闭线程。
3.3 Storm on Yarn
Storm on yarn 并不是直接提交到 YARN 集群,它只是提交到 local 或者 stand alone 的模式。Flink on yarn 主要是提供了 ClusterClient 这样一个代理,实现方式有三个步骤:
-
初始化 YarnClusterConfiguration Flink 配置 执行 jar 包 / 资源配置 加载 classpath;
-
启动 yarn client;
-
复用 Flink on yarn 机制 deploy 转换后的 jobGraph。
4. 任务迁移
在完善上述的一些改进之后,迁移就比较容易了。首先我们会把改造后的版本打包,上传到公司的私服上。然后用户在他的工程里面只需要引入 jar 包。在代码这一块,只需要将原来基于 storm 的提交方式改造成基于 Flink 的提交方式,逻辑是完全不用动的。在任务部署模式这一块,也提供了 Flink 提交的模式,这样一个脚本可以实现 Flink Perjob 模式。
图片总结一下,除了一些比较极端的复杂情况,基本上做到了无缝迁移所有的任务。迁移到 Flink 之后,大部分任务的延迟都降低到毫秒级别,整个吞吐提升 3~5 倍。同时,整体资源节省了大概 40%,约等于 80 台机器。完成了 5 个 storm 集群完全下线,实现了任务平台化管理。
图片四、一站式实时计算平台
1. Wstream 平台
我们为了提升管理效率而打造了 Wstream 平台,它构建在底层引擎和上层应用之间,对用户可以屏蔽底层的集群信息,比如跨机房多集群的一些信息。
-
在任务接入方式上,支持 Flink Jar,Flink SQL,Flink-Storm,PyFlink 这 4 种方式,来满足多元化的用户需求。
-
在产品功能上,主要支持了任务管理、任务的创建、启动删除等。
-
另外,为了更好的让用户管理自己的任务和对任务进行问题定位,我们也提供了一个监控告警和任务诊断的系统。
-
针对数仓,提供了一些数仓平台化的功能,包括权限管理、血缘关系等等。
-
针对 Flink SQL 也提供了调试探查的功能。
用户可以在 Wstream 平台之上很好的去构建他们的应用。
图片2. 状态管理
状态作为 Flink 一个比较重要的特性,在实际场景中有大量的应用。用户在使用平台的时候,没法跟底层的 Flink 工具进行交互,于是我们就将底层的一些能力进行了集成。
-
在任务保存方面,支持 Checkpoint,Savepoint,Cancel With Savepoint。
-
在容错方面,支持 allowNonRestoredState,跳过无法恢复的状态。
-
在分析方面,支持 Queryable State 实时查询,基于离线的 State Processor 的分析方式,我们会帮用户把这个状态下载进行分析。
对于整个任务状态管理来说,我们通过 jobgraph 设置定向到指定 Hdfs 目录,进行统一目录管理。在状态小文件这块,控制并发度,jobgraph 优化,checkpoint 间隔时间,保留版本数量。
图片3. SQL 调试
针对 Flink SQL,我们也提供了一些调试功能。这里主要包括两块:
-
第一,语法层面的功能包括:
- 智能提示;
- 语法校验;
- 转换 graph 逻辑校验。
-
第二,逻辑层面的功能包括:
- 模拟输入,DataGen 自定义数据源;
- 结果输出,Print 重定向到标准输出。
这样我们可以更方便的对整个业务逻辑进行调试。
图片4. 任务监控
关于任务监控,对于 Flink 实时计算任务来说,我们主要关心的是任务的稳定性、性能方面、以及业务逻辑是否符合预期。对于如何监控这些指标,主要包括 4 个层面:
-
第一个是 Flink 自带的 Flink-metrics,提供大量的信息,比如流量信息、状态信息、反压、检查点、CPU、网络等等;
-
第二个是 yarn 层面,提供运行时长、任务状态;
-
第三,从 kafka 层面提供消息堆积;
-
最后,通过用户自定义的一些 metrics,我们可以了解业务逻辑是否符合预期。
5. 监控体系
为了采集这些指标,我们也基于 Prometheus 搭建了一套监控体系。对于所有的 Flink 任务,会实时将 metrics 推到 pushgateway,然后会将收集到的指标推到 Prometheus,这一块我们主要是采用的 federation 的机制。所有子节点负责指标采集,之后汇聚到一个中心节点,由中心节点统一对外提供服务。最终可以实现整个指标的计算和告警。
图片6. 监控告警
有了上面这些指标之后,我们在告警这一块就可以比较方便。针对实时计算比较关注的任务稳定性方面,我们可以从 Topic 消息消费堆积、任务计算 qps 波动、Flink task Restart、Flink Checkpoint failed、任务失败、延迟等信息来观察整个任务的运行情况。
图片7. 指标可视化
在指标可视化这一块,主要是两个层面:
-
第一个层面是 Job 层面,这一块主要是把一些比较核心的指标汇聚到我们的实时计算平台。比如说,qps 信息、输入输出的信息、延迟的信息等等;
-
对于更底层的 task 级别的 metrics,通过 Grafana 可以了解具体的一些task信息,比如流量信息、反压信息等。
五、后续规划
我们的后续规划,主要包括 4 个方面:
- 第一个是社区比较流行的批流合一。因为我们当前这个实时架构大部分还是基于 Lambda 架构,这种架构会带来很大的维护工作量,所以我们也希望借助批流合一的能力来简化架构;
- 第二个是资源调优,因为作为流式计算来说,缺少一些动态资源管理的机制,因此我们也希望有手段来进行这样一些调优;
- 第三个是智能监控,我们当前的监控和告警是事后的,希望有某种方式在任务出现问题之前进行预警;
- 最后是拥抱社区的新能力,包括对新场景的探索。