2020-11-29 实时同步
背景
数据源进入数据仓库,需要一步ETL操作,传统通过离线的方式,将前一天T+1的数据导入到ODS层中。但是随着公司业务不断发展,数据量不断增加,这种T+1的导入方式,耗时越来越长,任务的稳定性越来越差,账单表2019年11月的增量数据为6000万/天,2020年11月的增量数据为1.8亿/天,增长了3倍,按原来的方式,离线拉取数据需要2 - 3小时,如果失败了,需要重新拉取。重新拉取需要2 - 3小时,后置任务都需要等待无法执行,集群资源空闲,当账单表任务完成后,所有的后置任务都启动了,扎堆运行,会导致资源负载非常高。账单表任务的延迟还会导致当天的任务都会延迟2~3小时,相关报表延迟展示,所有的分析、运营人员需要的数据也会延迟,相关分析人员可能要到下午才能开始正常分析、挖掘。
我们一年的KPI失败次数是12次,而5月份,已经失败了6次,每次延迟或者失败,都会有凌晨4点的告警电话,值班人员起床手动重跑。
- 下图是集群资源使用的监控信息
- 离线导入数据时间长,核心宽表任务没有完成,后置任务无法启动,集群无法工作,资源使用率低,任务无法使用,0~3点集群利用率低, CPU平均使用率38%,内存 平均使用率 50%。
- 3点到14点之间,大量任务运行,内存平均使用率98%,CPU平均使用率75%,如果能够将0~3点这个时间段利用起来,任务可以提前3小时完成
image.png
为了提高核心宽表稳定性、降低耗时、提高集群资源利用率,需要对原来的方案进行优化。
方案
目标
利用实时计算实现数据实时同步,降低离线采集时间,提高任务稳定性,从而提高集群利用率。用小吃多餐的方式,分担离线一次性导入数据的性能瓶颈,该方案只适用于T+1增量采集的数据表。
数据流
image.png- 原来离线采集的方式,需要将数据通过网络,拉取到本地,再上传到集群里的spark表,数据量大的时候,不仅拉取上传的时间长,还会导致客户端机器负载非常高,容易出现异常
- 通过玄武实时采集的方式同步到kafka,再通过实时任务将数据实时同步到spark表。将一整天的数据,拆分到每分每秒,分而治之。当前一天的数据没有延迟,就可以直接使用,不需要原来离线采集的拉取跟上传操作
简单例子
image.png- 首先我们的目的就是需要将DB表的数据导入到spark对应的一张表里
- DB表只有3条数据,将离线同步数据的操作,转化为对流水数据的全量同步后,流水数据会有6条操作数据,对应spark表也会有6条数据
- 最后将spark表的6条数据,按一定的逻辑去重,这里的逻辑主键为id+operType,按照这2个组成的规则去重后,得到的数据,就是跟DB表的数据一致
具体实现
mysql表
比如mysql表是以下5个字段的数据
Findex | Forder_id | Fversion | Fcreate_time | Fmodify_time |
---|---|---|---|---|
17106 | O20201208505953012260 | 2 | 20-12-08 15:10:07 | 2020-12-08 15:12:25 |
kafka数据
{
"headers":{
"data_size":"254",
"log_file_offset":"813159285",
"database_name":"fund_credit_trade_each_26_db",
"pipeline_time_consuming":"204",
"log_file_name":"binlog.000040",
"db_move_version":"0",
"table_name":"t_trade_order_amount_0",
"start_time":"2020-12-08 15:12:00",
"event_execute_time":"1607411545000",
"hash_code":"5",
"record_oper_type":"U",
"exec_time":"2020-12-08 15:12:25",
"data_table_id":"480",
"monitor_topic":"C8_JG_OTTER_FundcredittradeDB_fund_credit_trade_each_--_db.t_trade_order_amount_-_topic",
"murmur_hash_code":"1949271285",
"pipeline_time_consuming_without_canal":"4",
"schema_table_name":"fund_credit_trade_each_--_db.t_trade_order_amount_-"
},
"body":{
"Findex":"17106",
"Forder_id":"O20201208505953012260",
"Fversion":"2",
"Fcreate_time":"2020-12-08 15:10:07",
"Fmodify_time":"2020-12-08 15:12:25"
}
}
- 玄武实时采集的时候,将mysql表的数据,存放到body里面,用K-V的json格式,跟原表数据保持原样,然后再headers会增加许多元数据信息,如binlog信息、库表信息等。
spark表
相比原来的离线采集的方式,实时采集的数据是binlog流水日志,spark表不支持update操作,为了数据的最终一致性,需要增加一些逻辑字段,方便对数据进行去重操作。除了保留DB表的所有字段,还会增加以下字段。
字段名 | 对应字段 | 描述 |
---|---|---|
fetl_time | 数据的写入spark表时间 | |
fencrypt_version | 脱敏数据版本 | |
f_etl_offset | kafka 中数据的offset | |
fetl_version | SDK写入的版本号 | |
f_etl_db_name | headers 中的database_name | 数据从属的库名 |
f_etl_table_name | headers 中的table_name | 数据从属的表名 |
f_etl_op_type | headers 中的record_oper_type | 操作类型 string,值: “U”,“I”,“D”,分别代表 update,insert,delete |
fetl_event_execute_time | headers 中的event_execute_time | binlog event时间 |
fetl_db_move_version | headers 中的db_move_version | 重置位点次数/位点版本 |
fetl_binlog_name | headers 中的log_file_name,去掉前面7个字符 | binlog文件ID |
fetl_binlog_position | headers 中的log_file_offset | binlog位点 |
spark表去重逻辑
(
select o.*,
row_number() over(
partition by f_etl_db_name,
f_etl_table_name,
fid (相关主键,可以是orderid,findex...)
order by
fetl_binlog_name desc,
fetl_binlog_position desc,
case
when f_etl_op_type = 'D' then 1
when f_etl_op_type = 'U' then 2
when f_etl_op_type = 'I' then 3
else 4
end
) as frank
from rt_ods.dspostloandb_postloan_xx_db_t_user_repay_info_y o
WHERE f_p_date = '2020-12-07'
) t
where frank = 1
and f_etl_op_type != 'D'
image.png
- 逻辑就是通过binlog数据,找到最新的一条数据,我们用一个例子说明
- 首先我们知道fetl_binlog_name、 fetl_binlog_position 都是递增的,fetl_binlog_name binlog文件名,越大说明数据越新,fetl_binlog_position 位点信息,越大代表数据越新,
- 在mysql中orderId O20201215897453114900,在2020-12-05 08:59:02.0新建后(查看表中的f_etl_op_type=I),更新了7次(查看表中的f_etl_op_type=U),一共是8条binlog日志,最后的数据是图中的第一条
- 在Spark表中将完整记录这8条数据(1条Insert, 7条Update),为了让Spark表 8条数据去重后跟 mysql orderId O202012158974531149001条数据数据一致,需要对这8条数据去重,逻辑就是上面的Sql。
- 利用partition by找到同个库同个表以及该表的主键的所有数据,按照fetl_binlog_name desc,fetl_binlog_position desc,并且操作流水不为'D'(删除)的数据,就是找到最新的且不是删除的一条数据,就是2020-12-05 09:01:55.0这条数据,对应mysql最新的数据。
实时采集主要遇到的问题
使用要求
- 实时同步只适合增量任务
- 业务方需要按照DBA规范:更新数据的时候需要更新Fmodify_time
表名生成规则
instanceName.toLowerCase()+"_"+dbName.replace("#", "").toLowerCase()+"_"+tableName.replace("#", "").toLowerCase()
- 数据源支持:分库分表、百库十表、单库单表,实时同步到相应的1张Spark表
- 实例名:WateruserinfoDB
- 库名:user_info_water_fql_#xx#_#yy#_db
- 表名:t_user_info_water_fql_#z#
- 生成的实时表:
rt_ods.wateruserinfodb_user_info_water_fql_xx_yy_db_t_user_info_water_fql_z
数据吞吐量
- 目前接入256实例,28000+个表,每日处理量54000/秒,11MB/秒
数据准确性
- 通过binlog数据完整性来实现数据准确性,发送端保证数据不丢失,消费端保证数据不漏消费
- 如果binlog文件过大,导致磁盘容量不足,为了保证系统正常,binlog文件会被删除文件被删除的话。采集来不及采集就会导致数据就缺失,最终导致Spark表数据缺失。所以通过增加
f_etl_offset、fetl_event_execute_time、fetl_db_move_version、fetl_binlog_name bigint、fetl_binlog_position bigint
4个字段,来判断binlog是否完整,如果不完整,则走原来的离线同步方式
数据可用性
- 实时消费的时候,将表的最大时间记录起来,通过查记录跟当前时间比较,判断是否延迟同步来判断Spark表数据是否可用
spark表数据去重
每一条数据都有唯一键,唯一键就是标识是唯一一条数据的,加上我们增加的附加字段组合作为去重的逻辑。可以完美解决解决的是数据重复问题。
效果
当天数据已经同步好,不在需要离线同步的时间,只需要将数据按规则去重后,便可以使用,减少了数据导入的时间,大大提高集群利用率以及任务完成率。
image.png
- 减少账单表数据导入时间1.5 ~ 3小时
- 核心任务9点前完成率从60%提升到 85% 提升25%
- 宽表任务3点前完成率从27%提升到95% 提升66%
- 0~3点资源使用率 内存88%,提升21%,CPU 59%,提升14%
- 上线后4个月宽表异常0次,管理组同事无需4点起床处理
运营
- 目前用于数据仓库重要任务日采集成功数620-780,成功率82%~91%
- 跟踪每日实时同步情况,对异常问题,进行定位分析,解决项目中的bug,提高实时同步任务成功率跟成功数
总结
实时同步利用了大数据解决方案的架构设计的解决思路、设计思想 -- 分而治之。
其实生活中这样的例子无处不在,例如让你扛一袋沙子,我一次扛不动,那么我拿个小桶,分开一桶一桶的搬,这其实就是分而治之的思路。
海量数据处理的分而治之
所谓海量数据处理,其实很简单,海量,海量,何谓海量,就是数据量太大,所以导致要么是无法在较短时间内迅速解决,要么是数据太大,导致无法一次性装入内存。
那解决办法呢?针对空间,无非就一个办法:大而化小:分而治之/hash映射,就把规模大化为规模小的。
- 一道常见的面试题
给定a、b两个文件,各存放50亿个url,每个url各占64字节,内存限制是4G,让你找出a、b文件共同的url?
假如每个url大小为10bytes,那么可以估计每个文件的大小为50G×64=320G,远远大于内存限制的4G,所以不可能将其完全加载到内存中处理,可以采用分治的思想来解决。
大数据框架设计的分而治之
分布式存储HDFS中,采用将大文件切片的方式,比如一个1G的文件,切分成8个128MB的文件,在分布式计算框架中,不管是MR引擎,还是Spark计算引擎,利用的都是这个原理,大文件处理慢,就切分成多个小文件,各个处理,提高并行度。
Kafka中的topic拥有多个partition也是这种思想,通过提高parition数量提升吞吐量。
分而治之还有JDK7中的Fork/Join框架,是一个并行执行任务的框架。原理:把大任务割成若干小任务,最终汇总小任务的结果得到大任务结果的框架,可以理解成一个单机版的MR/Spark计算框架。
架构设计的分而治之
image.png利用分而治之的思想,在实时同步的架构中实现。从上图可见,我们将数据读取T+1修改为从mysql binlog的方式去获取数据,采用玄武
进行binlog日志的获取以及解析。这其实也是一个拆分的思想,从原先的数据拉到客户端机器,一次性处理,拆分成数据端-DB-玄武-Kafka,将一整天的数据,拆分到每分每秒处理,分而治之。过程虽然变长了,但是实时性跟稳定性大幅度提升。
在理想情况下,我们在不清楚业务系统需求的时候,设计出来的架构跟具体业务系统的架构是不吻合的。当业务系统无法提供你所要的需求的时候,从不同的层面去思考,将业务进行拆分,或许会给你不一样的解决方案。
参考链接:
https://zhuanlan.zhihu.com/p/55869671
https://baijiahao.baidu.com/s?id=1649619201425234676&wfr=spider&for=pc