大数据Flink

Flink SQL CDC 上线!我们总结了 13 条生产实践经

2020-09-14  本文已影响0人  Flink中文社区

作者:曾庆东,金地物业中级开发工程师,负责聚合营业平台实时计算开发及运维工作,从事过大数据开发,目前专注于apache flink实时计算,喜欢开源技术,喜欢分享。

01 项目背景

本人目前参与的项目属于公司里面数据密集、计算密集的一个重要项目,需要提供高效且准确的OLAP服务,提供灵活且实时的报表。业务数据存储在MySQL中,通过主从复制同步到报表库。作为集团级公司,数据增长多而且快,出现了多个千万级、亿级的大表。为了实现各个维度的各种复杂的报表业务,有些千万级大表仍然需要进行Join,计算规模非常惊人,经常不能及时响应请求。随着数据量的日益增长和实时分析的需求越来越大,急需对系统进行流式计算、实时化改造。正是在这个背景下,开始了我们与 Flink SQL CDC 的故事。

02 解决方案

针对平台现在存在的问题,我们提出了把报表的数据实时化的方案。该方案主要通过 Flink SQL CDC + Elasticsearch 实现。Flink SQL 支持 CDC 模式的数据同步,将 MySQL 中的全增量数据实时地采集、预计算、并同步到 Elasticsearch 中,Elasticsearch 作为我们的实时报表和即席分析引擎。项目整体架构图如下所示:

实时报表实现具体思路是,使用 Flink CDC 读取全量数据,全量数据同步完成后,Flink CDC 会无缝切换至 MySQL 的 binlog 位点继续消费增量的变更数据,且保证不会多消费一条也不会少消费一条。读取到的账单和订单的全增量数据会与产品表做关联补全信息,并做一些预聚合,然后将聚合结果输出到 Elasticsearch,前端页面只需要到 Elasticsearch 通过精准匹配(terms)查找数据,或者再使用 agg 做高维聚合统计得到多个服务中心的报表数据。

从整体架构中,可以看到,Flink SQL 及其 CDC 功能在我们的架构中扮演着核心角色。我们采用 Flink SQL CDC,而不是 Canal + Kafka 的传统架构,主要原因还是因为其依赖组件少,维护成本低,开箱即用,上手容易。具体来说Flink SQL CDC 是一个集采集、计算、传输于一体的工具,其吸引我们的优点有:

① 减少维护的组件、简化实现链路;
② 减少端到端延迟;
③ 减轻维护成本和开发成本;
④ 支持Exactly Once的读取和计算(由于我们是账务系统,所以数据一致性非常重要);
⑤ 数据不落地,减少存储成本;
⑥ 支持全量和增量流式读取;

有关 Flink SQL CDC 的介绍和教程,可以观看 Apache Flink 社区发布的相关视频:https://www.bilibili.com/video/BV1zt4y1D7kt/

项目使用的是 flink-cdc-connectors 中提供的 mysql-cdc 组件。这是一个 Flink 数据源,支持对 MySQL 数据库的全量和增量读取。它在扫描全表前会先加一个全局读锁,然后获取此时的 binlog position,紧接着释放全局读锁。随后开始扫描全表,当全表快照读取完后,会从之前获取的 binlog position 获取增量的变更记录。因此这个读锁是非常轻量的,持锁时间非常短,不会对线上业务造成太大影响。更多信息可以参考 flink-cdc-connectors 项目官网:https://github.com/ververica/flink-cdc-connectors

03 项目运行环境与现状

我们在生产环境搭建了 Hadoop + Flink + Elasticsearch 分布式环境,采用的 Flink on YARN 的 per-job 模式运行,使用 RocksDB 作为 state backend,HDFS 作为 checkpoint 持久化地址,并且做好了 HDFS 的容错,保证 checkpoint 数据不丢失。我们使用 SQL Client 提交作业,所有作业统一使用纯 SQL,没有写一行 Java 代码。

目前已上线了 3 个基于 Flink CDC 的作业,已稳定在线上运行了两个星期,并且业务产生的订单实收和账单实收数据能实时聚合输出到 Elasticsearch,输出的数据准确无误。现在也正在对其他报表采用 Flink SQL CDC 进行实时化改造,替换旧的业务系统,让系统数据更实时。

04 具体实现

① 进入 Flink/bin,使用 ./sql-client.sh embedded 启动 SQL CLI 客户端。
② 使用 DDL 创建 Flink Source 和 Sink 表。这里创建的表字段个数不一定要与 MySQL 的字段个数和顺序一致,只需要挑选 MySQL 表中业务需要的字段即可,并且字段类型保持一致。

-- 在Flink创建账单实收source表
CREATE TABLE bill_info (
  billCode STRING,
  serviceCode STRING,
  accountPeriod STRING,
  subjectName STRING ,
  subjectCode STRING,
  occurDate TIMESTAMP,
  amt  DECIMAL(11,2),
  status STRING,
  proc_time AS PROCTIME() -–使用维表时需要指定该字段
) WITH (
  'connector' = 'mysql-cdc', -- 连接器
  'hostname' = '******',   --mysql地址
  'port' = '3307',  -- mysql端口
  'username' = '******',  --mysql用户名
  'password' = '******',  -- mysql密码
  'database-name' = 'cdc', --  数据库名称
  'table-name' = '***'
);

-- 在Flink创建订单实收source表
CREATE TABLE order_info (
  orderCode STRING,
  serviceCode STRING,
  accountPeriod STRING,
  subjectName STRING ,
  subjectCode STRING,
  occurDate TIMESTAMP,
  amt  DECIMAL(11, 2),
  status STRING,
  proc_time AS PROCTIME()  -–使用维表时需要指定该字段
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '******',
  'port' = '3307',
  'username' = '******',
  'password' = '******',
  'database-name' = 'cdc',
  'table-name' = '***',
);

-- 创建科目维表
CREATE TABLE subject_info (
  code VARCHAR(32) NOT NULL,
  name VARCHAR(64) NOT NULL,
  PRIMARY KEY (code) NOT ENFORCED  --指定主键
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://xxxx:xxxx/spd?useSSL=false&autoReconnect=true',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'table-name' = '***',
  'username' = '******',
  'password' = '******',
  'lookup.cache.max-rows' = '3000',
  'lookup.cache.ttl' = '10s',
  'lookup.max-retries' = '3'
);

-- 创建实收分布结果表,把结果写到 Elasticsearch
CREATE TABLE income_distribution (
  serviceCode STRING,
  accountPeriod STRING,
  subjectCode STRING,
  subjectName STRING,
  amt  DECIMAL(13,2),
  PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://xxxx:9200',
  'index' = 'income_distribution',
  'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL'
);

以上的建表 DDL 分别创建了订单实收 source 表、账单实收 source 表、产品科目维表和 Elasticsearch 结果表。建表完成后,Flink 是不会马上去同步mysql的数据,而是等到用户提交了一个 insert 作业后才会执行同步数据,并且 �Flink 不会存储数据。我们的第一个作业是计算收入分布,数据来源于 bill_infoorder_info 两张 MySQL 表,并且账单实收表和订单实收表都需要关联维表数据获取应收科目的最新中文名称,按照服务中心、账期、科目代码和科目名称进行分组计算实收金额的sum值,实收分布具体DML如下:

INSERT INTO income_distribution
SELECT t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName, SUM(amt) AS amt 
FROM (
  SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt 
  FROM bill_info AS b
  JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code 
  GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name
UNION ALL
  SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt
  FROM order_info AS b
  JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code 
  GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name
) AS t1
GROUP BY t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName;

Flink SQL 的维表 JOIN 和双流 JOIN 写法上不太一样,对于维表,还需要在 Flink source table 上添加一个proctime 字段 proc_time AS PROCTIME(),关联的时候使用 FOR SYSTEM_TIME AS OF 的 SQL 语法查询时态表,意思是关联查询最新版本的维表数据。关于维表 JOIN 的使用可参阅 https://ci.apache.org/projects/Flink/Flink-docs-release-1.11/zh/dev/table/streaming/joins.html

③ 在SQL Client 执行以上作业后,YARN 会创建一个 Flink 集群运行作业,并且用户可以在 Hadoop 上查看到执行作业的所有信息,并且能进入 Flink 的 Web UI 页面查看 Flink 作业详情,以下是 Hadoop所有作业情况。

④ 作业提交后,Flink SQL CDC 会扫描指定的 MySQL 表,在这期间 Flink 也会进行 checkpoint,所以需要按照上文所述的配置 checkpoint 的重试策略和重试次数。当数据被读取进 Flink 后,Flink 会流式地进行作业逻辑的计算,实时统计出聚合结果输出到 Elasticsearch(sink端)。相当于我们使用 Flink 在 MySQL 的表上维护了一个实时的物化视图,并将这个实时物化视图的结果存在了 Elasticsearch 中。在 Elasticsearch 中使用 GET /income_distribution/_search{ "query": {"match_all": {}}} 命令查看输出的实收分布结果,如下图:

通过图中的结果可以看出聚合结果被实时的计算出来,并写到了 Elasticsearch 中了。

05 踩过的坑和学到的经验

  1. Flink 作业原来运行在standalone session模式下,提交多个Flink作业会导致作业失败报错。

    • 原因:因为 standalone session 模式下启动多个作业会导致多个作业的 Task 共享一个 JVM,可能会导致一些不稳定的问题。并且排查问题时,多个作业的日志混再一个 TaskManager 中,增加了排查的难度。
    • 解决方法: 采用 YARN 的 per-job 模式启动多个作业,能有更好的隔离性
  2. SELECT elasticsearch table 报以下错误

  1. flink-conf.yaml 里修改默认并行度,但是在 Web UI 看到作业的并行度还是1,并行度修改不生效。
  1. Flink 作业在扫描 MySQL 全量数据时,checkpoint 超时,出现作业 failover,如下图:
execution.checkpointing.interval: 10min   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint 失败容忍次数
restart-strategy: fixed-delay  # 重试策略
restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数

目前 Flink 社区也有一个 issue(Flink-18578)来支持 source 主动拒绝 checkpoint 的机制,将来基于该机制,能比较优雅地解决这个问题。

  1. Flink怎么样开启 YARN 的 per-job 模式?
  1. 进入SQL Client 创建 table 后,在另外一个节点进入 SQL Client 查询不到 table。
  1. 作业在运行时 Elasticsearch 报如下错误:

Caused by: org.apache.Flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=illegal_argument_exception, reason=mapper [amt] cannot be changed from type [long] to [float]]

  1. 作业在运行时 mysql cdc source 报如下错误:
  1. 扫描全表阶段慢,在 Web UI 出现如下现象:
configuration:
  table.exec.mini-batch.enabled: true
  table.exec.mini-batch.allow-latency: 2s
  table.exec.mini-batch.size: 5000
  table.optimizer.distinct-agg.split.enabled: true
  1. CDC source 扫描 MySQL 表期间,发现无法往该表 insert 数据。
  1. 多个作业共用同一张 source table 时,没有修改 server id 导致读取出来的数据有丢失。
SELECT * 
FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;
  1. 在启动作业时,YARN 接收了任务,但作业一直未启动:
  1. AM进程启不来,一直被kill掉

06 总结

为了提升了实时报表服务的可用性和实时性,一开始我们采用了 Canal+Kafka+Flink 的方案,可是发现需要写比较多的 Java 代码,而且还需要处理好 DataStream 和 Table 的转换以及 binlong 位置的获取,开发难度相对较大。另外,需要维护 Kafka 和 Canal 这两个组件的稳定运行,对于我们小团队来说成本也不小。由于我们公司已经有基于 Flink 的任务在线上运行,因此采用 Flink SQL CDC 就成了顺理成章的事情。基于 Flink SQL CDC 的方案只需要编写 SQL ,不用写一行 Java 代码就能完成实时链路的打通和实时报表的计算,对于我们来说非常的简单易用,而且在线上运行的稳定性和性能表现也让我们满意。

我们正在公司内大力推广 Flink SQL CDC 的使用,也正在着手改造其他几个实时链路的任务。非常感谢开源社区能为我们提供如此强大的工具,也希望 Flink CDC 越来越强大,支持更多的数据库和功能。也再次云邪老师对于我们项目上线的大力支持!

上一篇下一篇

猜你喜欢

热点阅读