大数据计算

伴鱼基于 Flink 构建数据集成平台的设计与实现

2021-12-09  本文已影响0人  Flink中文社区

数据仓库有四个基本的特征:面向主题的、集成的、相对稳定的、反映历史变化的。其中数据集成是数据仓库构建的首要前提,指将多个分散的、异构的数据源整合在一起以便于后续的数据分析。将数据集成过程平台化,将极大提升数据开发人员的效率。本文主要内容为:

  1. 数据集成 VS 数据同步
  2. 集成需求
  3. 数据集成 V1
  4. 数据集成 V2
  5. 线上效果
  6. 总结
伴鱼数据集成平台的设计与实现 FL.jpg

Flink 中文学习网站
https://flink-learning.org.cn

A data warehouse is a subject-oriented, integrated, nonvolatile, and time-variant collection of data in support of management’s decisions.

—— Bill Inmon

一、数据集成 VS 数据同步

「数据集成」往往和「数据同步」在概念上存在一定的混淆,为此我们对这二者进行了区分。

二者的关系如下图所示:

vs

「数据同步平台」提供基础能力,不掺杂具体的业务逻辑。「数据集成平台」是构建在「数据同步平台」之上的,除了将原始数据同步之外还包含了一些聚合的逻辑(如通过数据库的日志数据对快照数据进行恢复,下文将会详细展开)以及数仓规范相关的内容(如数仓 ODS 层库表命名规范)等。目前「数据同步平台」的建设正在我们的规划之中,但这并不影响「数据集成平台」的搭建,一些同步的需求可提前在「实时计算平台」创建,以「约定」的方式解耦。

值得一提的是「数据集成」也应当涵盖「数据采集」(由特定的工具支持)和「数据清洗」(由采集粒度、日志规范等因素决定)两部分内容,这两部分内容各个公司都有自己的实现,本文将不做详细介绍。

二、集成需求

目前伴鱼内部数据的集成需求主要体现在三块:Stat Log (业务标准化日志或称统计日志)、TiDB 及 MongoDB。除此之外还有一些 Service Log、Nginx Log 等,此类不具备代表性不在本文介绍。另外,由于实时数仓正处于建设过程中,目前「数据集成平台」只涵盖离线数仓(Hive)。

由于以上两种类型的数据集成方式差异较大,下文将分别予以讨论。

三、数据集成 V1

伴鱼早期「数据集成平台」已具备雏形,这个阶段主要是借助一系列开源的工具实现。随着时间推进,这个版本暴露的问题也逐渐增多,接下来将主要从数据流的角度对 V1 进行阐述,更多的细节问题将在 V2 版本的设计中体现。

3.1 Stat Log

日志的集成并未接入平台,而是烟囱式的开发方式,数据集成的链路如下图所示:

v1.0_log

Kafka 中的数据先经过 Flume 同步至 HDFS,再由 Spark 任务将数据从 HDFS 导入至 Hive 并创建分区。整体链路较长且引入了第三方组件(Flume)增加了运维的成本,另外 Kafka 的原始数据在 HDFS 冗余存储也增加了存储的开销。

3.2 DB

DB 数据的集成主要是基于查询的方式(批的方式,通过 Select 查询进行全表扫描得到快照数据)实现,其链路如下图所示:

v1.0_db

用户通过平台提交集成任务,由 Airflow 定时任务扫描集成平台元数据库,生成对应的取数任务(TiDB 的数据通过 Sqoop 工具,MongoDB 的数据则通过 Mongoexport 工具)。可以看到 V1 版本并没有获取数据库的变更的日志数据,不能满足对数据变更过程的分析诉求。

由于 Sqoop 任务最终要从 TiDB 生产环境的业务数据库获取数据,数据量大的情况下势必对业务数据库造成一定的影响。Mongoexport 任务直接作用在 MongoDB 的隐藏节点(无业务数据请求),对于线上业务的影响可以忽略不计。基于此,DBA 单独搭建了一套 TiDB 大数据集群,用于将体量较大的业务数据库同步至此(基于 TiDB Pump 和 Drainer 组件),因此部分 Sqoop 任务可以从此集群拉群数据以消除对业务数据库的影响。从数据流的角度,整个过程如下图所示:

v1.0

是否将生产环境 TiDB 业务数据库同步至 TiDB 大数据集群由数仓的需求以及 DBA 对于数据量评估决定。可以看出,这种形式也存在着大量数据的冗余,集群的资源随着同步任务的增加时长达到瓶颈。并且随着后续的演进,TiDB 大数据集群也涵盖一部分数据应用生产环境的业务数据库,集群作用域逐渐模糊。

四、数据集成 V2

V2 版本我们引入了 Flink,将同步的链路进行了简化,DB 数据集成从之前的基于查询的方式改成了基于日志的方式(流的方式),大大降低了冗余的存储。

4.1 Stat Log

借助 Flink 1.11 版本后对于 Hive Integration 的支持,我们可以轻松的将 Kafka 的数据写入 Hive,因此 Stat Log 的集成也就变得异常简单(相比 V1 版本,去除了对 Flume 组件的依赖,数据冗余也消除了),同时 Flink Exactly-Once 的语义也确保了数据的准确性。从数据流的角度,整个过程如下图所示:

v2.0_log

目前按照小时粒度生成日志分区,几项 Flink 任务配置参数如下:

checkpoint: 10 min
watermark: 1 min
partition.time-extractor.kind: ‘custom’
sink.partition-commit.delay: ‘3600s’
sink.partition-commit.policy.kind: ‘metastore,success-file’
sink.partition-commit.trigger: ‘partition-time’

4.2 DB

基于日志的方式对 DB 数据进行集成,意味着需要采集 DB 的日志数据,在我们目前的实现中 TiDB 基于 Pump 和 Drainer 组件(目前生产环境数据库集群版本暂不支持开启 TICDC),MongoDB 基于 MongoShake 组件,采集的数据将输送至 Kafka。采用这种方式,一方面降低了业务数据库的查询压力,另一方面可以捕捉数据的变更过程,同时冗余的数据存储也消除了。不过由于原始数据是日志数据,需要通过一定的手段还原出快照数据。新的链路如下图所示:

v2.0_db

用户提交集成任务后将同步创建三个任务:

「存量任务」和「Merge 任务」由离线调度平台 Dolphinscheduler(简称 DS)调度执行,任务执行过程中将从集成任务的元数据库中获取所需的信息。目前「Merge 任务」按小时粒度调度,即每小时还原快照数据。

从数据流的角度,整个过程如下图所示:

v2.0

DB 的数据集成相较于 Stat Log 复杂性高,接下来以 TiDB 的数据集成为例讲述设计过程中的一些要点(MongoDB 流程类似,区别在于存量同步工具及数据解析)。

4.2.1 需求表达

对于用户而言,集成任务需要提供以下两类信息:

4.2.2 存量任务

存量任务虽然有且仅执行一次,但为了完全消除数据集成对业务数据库的影响,我们选择数据库的备份-恢复机制来实现。公司内部数据库的备份和恢复操作已经平台化,集群将定期进行备份(天粒度),通过平台可以查询到集群的最新备份,并且可由接口触发备份恢复操作,故存量的获取可直接作用于恢复的数据库。

由于数据库备份的时间点与集成任务提交的时间点并不一定是同一天,这之间存在着一定的时间差将导致存量快照数据不符合我们的预期,各时间点的关系如下图所示:

time

按照我们的设定,存量快照数据应当是包含 T4 之前的全部数据,而实际备份的快照数据仅包含 T1 之前的全部数据,这之间存在这 N 天的数据差。

注:这里之所以不说数据差集为 T1 至 T4 区间的数据,是因为增量的 Binlog 数据是以整点为分区的,在 Merge 的时候也是将整点的分区数据与存量数据进行聚合,并支持了数据去重。因此 T1 时刻的存量数据与 T0-T3 之间的增量数据的 Merge 结果等效于 T0 时刻的存量数据与 T0-T3 之间的增量数据的 Merge 结果。所以 T1 至 T4 的数据差集等效 T0 至 T3 的数据差集,即图示中的 N 天数据。

对于缺失的这部分数据实则是可以在「存量任务」中进行补全,仔细分析这其实是可以通过执行的 「Merge 任务」的补数操作实现。

整个「存量任务」的工作流如下图所示:

stock

4.2.3 Merge 任务

Merge 任务的前提是存量数据与增量数据都已经 ready,我们通过 _SUCCESS 文件进行标记。整个「Merge 任务」的工作流如下图所示:

merge

Merge 操作通过 Flink DataSet API 实现。核心逻辑如下:

核心代码:

allMergedData.groupBy(x -> x.getKeyCols())
             .reduce(new ReduceFunction<MergeTransform>() {
                 
                 public MergeTransform reduce(MergeTransform value1, MergeTransform value2) throws Exception {
                     if (value1.getCommitTS() > value2.getCommitTS()){
                         return value1;
                     }
                     return value2;
                 }
             })
             .filter(new FilterFunction<MergeTransform>() { //增量:过滤掉 op=delete
                 
                 public boolean filter(MergeTransform merge) throws Exception {
                     if (merge.getOpType().equals(OPType.DELETE)){
                         return false;
                     }
                     return true;
                 }
             })
             .map(x -> x.getHiveColsText())
             .writeAsText(outPath);

主要思想为「后来者居上」,针对于 Insert、Update 操作,最新值直接覆盖旧值,针对 Delete 操作,直接丢弃。这种方式也天然的实现了数据去重操作。

4.2.4 容错性与数据一致性保证

我们大体可以从三个任务故障场景下的处理方式来验证方案的容错性。

以上,通过自动恢复机制和报警机制确保了整个工作流的正确执行。接下来我们可以从数据的角度看一下方案对于一致性的保障。

数据的一致性体现在 Merge 操作。两份数据聚合,从代码层面一定可以确保算法的正确性(这是可验证的、可测试的),那么唯一可能导致数据不一致的情况出现在两份输入的数据上,即存量和增量,存在两种情况:

针对 Flink 流式写 Hive 过程中的乱序数据处理可以采取两种手段:一是 Kafka 设置单分区,多分区是产生导致乱序的根因,通过避免多分区消除数据乱序。二是报警补偿,乱序一旦产生流式任务是无法完全避免的(可通过 watermark 设置乱序容忍时间,但终有一个界限),那么只能通过报警做事后补偿。问题转换成了如何感知到乱序,我们可以进一步分析,既然乱序数据会触发前一个分区的二次提交,那么只需要在提交分区的时候检测前一个分区是否存在 _SUCCESS 标记便可以知晓是否是乱序数据以及触发报警。

五、线上效果

总览

home

存量任务

ds_stock

Merge 任务

ds_merge

六、总结

本文阐述了伴鱼「数据集成平台」核心设计思路,整个方案还有一些细节未在文章中体现,如数据 Schema 的变更、DB 日志数据的解析等,这些细节对于平台构建也至关重要。目前伴鱼绝大部分的集成任务已切换至新的方式并稳定运行。我们也正在推进实时数仓集成任务的接入,以提供更统一的体验。

原文:伴鱼数据集成平台的设计与实现


近期热点

img
上一篇下一篇

猜你喜欢

热点阅读