Flink CDC 在易车的应用实践

2023-04-18  本文已影响0人  Flink中文社区
开发者社区.jpeg

摘要:本文整理自易车数据平台负责人王林红,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部分:

  1. Flink 应用场景
  2. DTS 平台建设
  3. Flink CDC + Hudi 应用实践
  4. 未来规划

点击查看直播回放和演讲 PPT

一、Flink 应用场景

1.jpg

Flink 在易车有丰富的应用场景,主要包含实时数仓建设和数据集成。

对于实时数仓建设,主要是数仓实时指标的开发,将离线指标逐步向实时指标过度;同时承接了公司各种实时大屏需求,并多次支持了公司内部的 818 购车节活动。

在实时监控方面,首先是日志监控,主要用来监控埋点情况,另外对于服务器日志,我们也进行统一收集和监控,监控服务响应和异常日志等,同时结合机器学习算法,做日志的聚类;在前端层面,监控前端接口超时、白屏等异常情况。最后也应用在一些业务实时监控、风控等场景。

在数据集成方面,使用 Flink 完成关系型数据库的实时接入,将 MySQL、SQL Server 等的数据实时接入到 Kafka 中;同时将 Kafka 的数据实时同步到 HDFS/Hive 中,实现数据的实时入仓、入湖;对于数据传输通道,我们使用 Flink 将 Kafka 的数据同步到下游存储引擎中,比如 TiDB、MySQL、ClickHouse、HDFS、Doris 等存储中,也实现一些异构数据源数据的实时同步。

二、DTS 平台建设

2.jpg

易车的数据集成主要分为两条线,一条离线数据集成,另一条是实时数据集成。本次主要介绍的是实时数据集成的演进过程,对于实时数据集成,最开始也是处在离线阶段,随着业务的发展,对数据的时效性越来越高,开始使用 Canal 同步 MySQL 的数据,然后使用 Spark 做微批计算,再之后引入 Flink,还是使用 Canal 同步接入 MySQL 数据,使用 Flink 进行数据的实时计算,再之后引入了 Flink CDC,基于 Flink CDC 做全增量一体化实时计算。

3.jpg

在使用 Canal+Flink 的早期阶段,整体流程如下,对于 MySQL 的数据,使用 Canal 通过解析 Binlog 的方式将数据同步到 Kafka 中,对于 SQL Server 的数据,通过 CDC 的方式同步到 Kafka 中,然后通过 Flink 进行加工计算,同步到下游系统如 HDFS 或 Kafka 中,在这个阶段,基本可以满足业务需求,也可以快速完成数据接入及后续开发。

但是也存在一些痛点问题,主要是,整个数据链路比较长,依赖的组件多,运维成本也比较高,另外 Canal 不支持全量数据的同步,全量和增量是割裂的两个阶段,并且对于不同数据源的接入,需要考虑不同的实时接入方案,维护也比较困难。

4.jpg

基于以上痛点问题,和我们的历史经验总结和评估,我们对数据集成工具提出了新的诉求。

5.jpg

基于以上诉求,我们把方案锁定在 Flink 技术栈中,决定基于 Flink CDC 自研实现流批一体的数据集成服务。为什么选择 Flink CDC?

6.jpg

Flink CDC 支持了丰富的数据源,源头支持 MySQL、Mongo、TiDB、Oracle、SQL Server 等,目标端支持 kakfa、Hudi、TiDB、Hive、Doris、ClickHouse 等。

同时 Flink CDC 作为新一代的数据集成框架,不仅可以替代传统的 DataX 和 Canal 做实时数据同步,将数据库的全量和增量数据一体化的同步到消息队列或下游系统中;也可以用于实时数据集成,将数据库数据实时入湖入仓;同时还支持强大的数据加工能力,可以通过 SQL API 或 DataStrean API 对数据库数据进行实时关联、打宽、聚合等。

在 2.0 版本中,Flink CDC 对于 MySQL CDC 支持了无锁读取、并发读取、全程断点续传等高级功能,实现 MySQL 数据的增量快照读取,在最新的 2.2 版本中,将增量快照读取算法抽象成了公共框架,也方便其他 Connector 的接入,其他 Connector 只需要接入这个框架就可以提供无锁算法,并发读取和断点续传的能力,十分方便其他连接器的扩展。

7.jpg

所以我们基于 Flink CDC 构建了 DTS 数据传输平台,在源端,目前已经集成支持了 MySQL、SQL Server、TiDB、Mongo、kafka 等数据源,在目标端,也集成了 Hudi、kafka、Doris、ClickHouse、HDFS、Hive 等数据源,方便业务进行数据实时入湖入仓,和异构数据源的传输、同步。

8.jpg

但是,在 DTS 平台建设过程中,我们也遇到了一些问题,比如元信息的字段映射,如何方便安全的将源库的字段类型映射成 Flink 的字段类型;在任务运行过程中,如何动态的增加新的同步表,包括如果业务源库字段变更了,下游系统如何处理?另外随着任务的增多,如何更好的对数据源信息进行维护,如一个业务库迁移,如何优雅的对任务中的数据源信息进行变更?

9.jpg

首先说元信息自动映射的问题,Flink CDC 支持丰富的数据源,这些数据源都需通过手工的方式映射成 Flink 的 DDL。手工映射表结构是比较繁琐的,尤其是数据源头多、映射关系比较复杂,每种数据源都有自己的映射关系,当表和字段数非常多的时候,手工映射也非常容易出错,对用户不友好,开发效率也不高。

10.jpg

为了解决上述问题,我们开发了统一数据源服务,我们将平台中使用到的数据源统一注册到数据源系统中,实现数据源的统一维护管理,同时实现表结构变更通知,影响分析等。

用户在实时计算平台创建表和创建同步任务时,选择对应的数据源,自动获取表的 Schema,通过模版化的方式创建表和数据同步任务,同时使用数据源 ID 对用户屏蔽连接串和账号密码信息,提升账号安全性。

另外数据源信息与任务信息关联,数据源变更或迁移,只需要修改数据源信息,降低修改成本。最后离线层的数据接入也依赖于统一数据源,离线和实时使用同一套元数据,便于流和批模型的统一。

11.jpg

上图是数据源改造前后的一个对比图,前面是原生的 MySQL 的流表,需要连接串、账号、密码信息,统一数据源之后,在链接串中只需要关注数据源 ID。

同时我们对 Connector 进行了改造,任务在执行时,会将具体的数据源 ID 替换为真实的链接串、账号和密码,对于 Kafka 流表也一样,在表创建时,只需要数据源 ID,任务执行时,会替换为 Kafka 的 Server 地址,对于 groupID,在任务中,通过 set 的方式进行设置。这种方式也方便我们进行后续 Kafka 集群的主备切换。对于其他的数据源,比如 Hbase、HDFS 等也做了类似的改造。

12.jpg

上图是我们自动建表的页面,选择对应的数据源,需要映射的源表,通过数据源服务自动获取源表的 Schema,自动做字段类型映射,通过模版化的方式,一键生成 Flink 的建表语句。

13.jpg

上图是数据同步的配置界面,用户主要选择对应的源和目标数据源、数据表,自动做字段映射,如果目标表不存在,会自动创建目标表。

得益于 MySQL CDC 动态加表功能,也可以在已有任务中,直接增加需要同步的表,添加的表会自动先同步该表的全量数据,然后再无缝切换到同步增量数据。遇到新增监控表时不用新起作业。同时也支持通过正则的方式配置分库分表的同步,另外对于源表字段变更,类型变更等,也做了一些适配。

14.jpg

接下来介绍下平台的整体架构,我们对 DTS 平台、调度平台、实时平台进行了深度的整合和集成,任务调度层集成到统一调度平台中,实现任务的统一管理,主要包含任务的运维管理、权限管理、资源管控、监控告警、和变量管理等。

对于实时平台,主要关注任务的开发、运维、和治理。

实时计算平台上主要支持 SQL 任务、Jar 包任务和 DTS 任务。对于 SQL 任务和 Jar 包任务,提供版本管理、资源管理:资源管理主要是将表、UDF、Connector 等资源统一管理,并通过模版化的方式和配置化的方式完成 Source、Sink 表的创建,降低用户开发成本;对于 TDS 任务,提供数据源管理、任务配置和数据校验等一些模块和服务。

通过服务平台化,打造一站式的任务开发管理平台,在平台上完成任务从开发到测试、发布、监控的全流程处理操作,降低用户使用平台的门槛。

15.jpg

对于核心的 DTS 数据传输架构如上,整个架构主要是基于 Flink 1.14 的 DataStream API 和 Flink CDC 2.2 构建,覆盖流批的场景,实现各种同步需求。

整个架构主要包含 Source 端、数据传输层、Sink 端, Source 和 Sink 端抽象出 SourceFactory 和 SinkFactory,方便实现对接各种类型数据源,在数据传输层,提供统一的基础服务框架,支持类型转换、自定义监控指标、数据校验等功能,如类型转换,DTS 中也支持了对 Canal 数据格式的适配。

目前 DTS 支持了公司内大部分数据传输管道,涵盖数据库,如 MySQL、SQL Server 和 TiDB 等;消息队列,如 Kafka、RocketMQ 等;以及大数据生态系统的各种组件,例如 HDFS、Hive、ClickHouse、Doris 等,覆盖了易车大部分实时流场景和少数离线场景。

16.jpg

这套数据集成架构如今在易车内部已稳定运行近一年时间,服务于众多产品线,整套架构对数据集成,有很大的收益。

三、Flink CDC + Hudi 应用实践

17.jpg

Flink CDC 的一个主要应用场景就是数据实时入湖,对于数据湖我们主要使用的 Hudi,Hudi 的主要特点如下:

18.jpg

使用 Hudi 后,在没有引入 Flink CDC 之前,我们的数据入湖架构如下:

首先使用 Canal 通过解析 Binlog 的方式将增量数据同步到 Kafka 中,然后通过 DataX,将 MySQL 的全量数据同步到 HDFS 上,然后使用 bulk_insert 的方式初始化数据到 Hudi 中,完成全量数据的初始化,最后,使用 Flink 消费 Kafka 的数据,将数据写入到 Hudi 中,同时通过主键解决数据冲突问题。

大家可以看到,这个架构整体的链路比较长,操作频繁、维护成本比较高,涉及的组件比较多,对于完成数据接入工作量比较大,并且稳定性不好保证,如果一旦有数据问题,数据恢复、重导也是一件比较痛苦的事情。

19.jpg

在使用 Flink CDC 之后,结合 DTS 平台,架构如上。在 DTS 平台中,很方便的可以实现 MySQL 数据一键入湖,并且得益于 Flink CDC 全增量一体化框架,不用考虑全增量问题,同时也支持动态增加表功能,操作非常简单。

20.jpg

随着接入表的增多,对于同一个数据源下的数据同步任务,建立了过多数据库连接,导致 Binlog 重复读取,会对数据源库造成巨大的压力。另外有些 Task 同步的数据量很小,也会造成一定的资源浪费。

21.jpg

为了解决这个问题,我们使用 API 的方式读取数据,通过侧输出流的方式对 DataStream 进行分流,实现合并 Source 的功能。对于读取的同一数据源,同一任务只会建立一个数据库连接,Binlog 也只会读取一次,降低了对数据库的压力,方便的实现了单任务多表的数据实时入湖。

22.jpg

在数据实时同步写入 Hudi 时,Flink Hudi 的写入 Pipeline 算子如下。

第一个算子负责将快照数据+增量数据加载到 Flink 状态。接着经过一个 Bucket Assigner,它主要负责将已经转好的 HudiRecord 分配到特定的 File Group 中,接着分好 File Group 的 Record 会流入 Writer 算子执行真正的文件写入。再之后会接一个 Compaction 的算子,主要用来解决 MOR 表读放大的问题。

这个架构在实际的生产环境会遇到如下问题:

23.jpg

为了解决这个问题,我们把 Compaction 进行单独拆分,拆分为一个独立的调度任务,同时为了合并的合理性,对相关的合并计划也做了一些优化。

24.jpg

除此之外,我们还做了一些其他的优化和 bug 修复。

除此之外,还有一些其他优化实践,大部分可以查阅资料或在社区的帮助下解决,在这里再次感谢社区。

四、未来规划

25.jpg

简单介绍一下我们的未来规划:

点击查看直播回放和演讲 PPT

上一篇 下一篇

猜你喜欢

热点阅读