基于 Flink CDC 的实时同步系统

2023-03-12  本文已影响0人  Flink中文社区

摘要:本文整理自科杰科技大数据架构师张军,在 FFA 数据集成专场的分享。本篇内容主要分为四个部分:

  1. 功能概述
  2. 架构设计
  3. 技术挑战
  4. 生产实践

点击查看直播回放和演讲 PPT

1.jpg

科杰科技是专门做大数据服务的供应商,目前的客户包括能源、金融、证券等各个行业。科杰科技产品的底层是基于湖仓一体的基础数据平台,在数据平台之上有离线、实时、机器学习等各种系统。我主要负责基于 Flink、Iceberg、K8s 的底层基础设施建设。今天将主要和大家分享,上图中框出来的子系统,即基于 Flink CDC 的实时数据同步系统。

一、功能概述

2.jpg

我们系统的主要的功能有如下几个:

3.jpg

市面上有很多实时同步的系统,最终我们选用了 Flink CDC 做实时同步系统的底层技术架构。主要是因为 Flink CDC 有一些独有的优势,包括全量同步、增量同步、全量+增量同步,还有底层基于 Flink 做的分布式计算引擎。

通过 Flink CDC 这套架构,想实现我们现有产品的需求,目前来看还有一些不足。

二、架构设计

4.jpg

接下来从技术角度给大家分享一下我们系统的设计架构,从上图中可以看到,一共分为三层。

最上面一层是输入端。基于 Flink CDC API 的方式读数据库进行数据抽取,然后把这些数据和 Schema 的信息发到中间的 Kafka,Kafka 是我们的中间缓冲层。最下面一层是输出端,会从 Kafka 读取输入端输入的数据。

在输出端这一层可以看到,首先进行过滤,常用的 SQL 表达式都可以做过滤条件。过滤后对字段应用一些 UDF,比如数据脱敏、加密等等。接下来根据 DB 和 Table 对数据进行 Keyby 分组,然后使用 KeyedProcessFunction 函数对每个表的数据进行一些处理,比如创建表、添加或者修改字段、插入数据等等。

当配置完任务之后,最后我们分别把 Source 和 Sink 的任务提交到运维中心,运维中心会对任务进行启动、停止、查看统计指标、查看任务状态等一系列操作。最后我们的任务支持在 Yarn 和 K8s 上运行,用户可以根据自己的情况进行选择。

5.jpg

在后台管理系统,用户可以通过配置输入端和输出端,配置需要同步的任务。任务会生成两个配置文件,分别是输入端的配置文件和输出端的配置文件,然后这两个配置文件会分别作为输入端和输出端的启动参数传给两个 Flink 任务。

6.jpg

这部分主要是想分享下,对于无法获取 DDL 事件的情况我们该如何处理呢?

其实有一些数据库,比如 MySQL,是可以通过 Flink CDC 来获取 Schema 的变更信息的,但是为了代码的逻辑统一,同时适配 Flink CDC 拿不到 Schema 变更的数据库。我们做了代码统一的处理,用一套架构完成数据和 Schema 的抽取和封装。

我们通过 JDBC 的方式,从源数据库把 Schema 的信息查出来,放到 Flink 的 State 里。当下一条数据来的时候,跟 State 里面的 Schema 数据进行对比。相同就不做任何处理,不同就再次查询一下 Schema 的信息,更新到 Flink State 里。同时将从 Flink CDC 拿到的数据和这条数据对应的 Schema 信息,封装成消息体,发送给中间层的 Kafka。从 Schema 读取的信息包含数据的类型、长度、精度,是否是主键等等,格式和 debezium-json 差不多。

Kafka 缓冲层可以用来实现以下几个功能。

在解耦方面:

在 DB 对应 Topic 方面:

7.jpg

输出端和输入端一样,读取后端生成的配置文件作为它的参数,然后使用一些过滤条件,UDF 转换条件等等,从 Kafka 读取数据,进行数据处理。

在数据处理的时候,因为每个输出源的处理逻辑不一样,所以分成以下三类。

8.jpg

运维中心可以对数据进行如下处理:

三、技术挑战

9.jpg

下面列举一些主要的技术挑战。

10.jpg

这是我们在开发过程中,输出端遇到的第一个问题,也就是 SQL 条件的过滤。大家可能乍一听觉得很简单,加一个 where 条件就行了,但 Flink 任务在做数据同步时,它要求输入端和输出端的 Schema 需要预先提前知道,且它是固定不变的,但是我们的情况有一些不同,比如对于整库同步的过程中,用户新增了一些表,或者在表同步的过程中,新增了一些字段,Flink 现有的 collector 无法识别这些新增的信息,无法在未知的字段上添加 where 条件。那么我们要如何解决这个问题呢?

我们发送到中间 Kafka 缓冲层的数据格式和 debezium-json 的格式差不多,数据主要存储在 payload.after 和 payload.before 里面,这里面的数据的格式是 map 类型,它的 key 是字符串,value 是 object 类型的数据,但是这个格式我们无法把它映射成 Flink SQL,因为 object 类型在 Flink CDC 里面没有对应的类型,所以我们把 object 类型映射成了 string 类型,并对 SQL 进行了一些转换。使用 Flink SQL 解析器把 where 条件进行解析,然后重新生成新的过滤条件。

比如我们原始的过滤 SQL 是这样的:

id between 1 and 3.5

经过我们的重构,变成了下面这个形式:

cast(payload.after['id'] as DECIMAL(2,1)) BETWEEN ASYMMETRIC 1 AND 3.5

11.jpg

数据经过 where 条件的过滤之后,并且经过 UDF 函数转换进入 KeyedProcessFunction 函数进行处理。第一步先判断输出端的目标库和目标表是否已经存在。在没有存在的情况下,用纯 JDBC 的方式拼接 SQL 执行 DDL,创建数据库和表。然后进行数据处理,为了提高性能。我们把数据放到队列里,当队列达到一定的阈值后,进行 flush 操作,把数据批量写入数据库。

在这个同步过程中,对于 Schema 的处理和 Source 端一样,把获取的 Schema 信息放到 State 里,每来一条数据进行一次 Schema 对比。如果发生了变更,就能证明数据发生了 DDL 的操作。这个时候要刷数据,把队列里的数据 flush 到数据库,然后执行 DDL,执行完 DDL 之后重新拼接一个 INSERT INTO 的 SQL 执行新插入的数据。通过这种方式实现不重启 Flink 任务的情况下,同时支持 DDL(create、alter)和 DML(insert、update、delete)等一系列操作。

12.jpg

因为 Iceberg 无法用纯 JDBC 的方式写入,所以它无法跟关系型数据结合到一起。因此 Flink 写入 Iceberg 会遇到以下的一些问题。

13.jpg

我们发现 Flink 不管用 SQL 还是 API 的方式,都无法完成我们的需求,所以我们从更底层的角度来考虑实现方法,最后使用 Iceberg 很底层的 API 来实现我们所需要的功能。

比如 Create Table 就是使用 Iceberg 里的 Catalog 来创建 Table 的,包含一些主键和 Schema。其他的操作,包括修改表的 Schema、写入数据、提交快照等都是用纯 Iceberg 的底层 API 来实现,没有使用现有的 Flink Iceberg API 来做,这样实现起来更加灵活。

14.jpg

在业务上,我们也会面临很多复杂的业务场景,比如对同一字段,我们会有很多种操作。比如需要支持 UDF;对字段加过滤条件;字段的映射;添加常量字段;开启字段同步等等。所以我们在写逻辑的时候,要考虑各种各样复杂的条件。因为可能改了其中某一个功能进而就影响了其他功能。

四、生产实践

15.jpg

我们系统上线后,目前已经服务于十几个客户,涉及到金融、能源等各个行业。支持的数据源包括 MySQL、PostgreSQL、Oracle、SQL Server 等。数据规模方面,目前客户用于同步的任务从几个到几十个库不等,每秒同步数千条数据。

16.jpg

未来我们将在以下三方面进行提升:

点击查看直播回放和演讲 PPT

上一篇 下一篇

猜你喜欢

热点阅读