2020-11-29 实时同步

2020-12-08  本文已影响0人  LancerLin_LX

背景

数据源进入数据仓库,需要一步ETL操作,传统通过离线的方式,将前一天T+1的数据导入到ODS层中。但是随着公司业务不断发展,数据量不断增加,这种T+1的导入方式,耗时越来越长,任务的稳定性越来越差,账单表2019年11月的增量数据为6000万/天,2020年11月的增量数据为1.8亿/天,增长了3倍,按原来的方式,离线拉取数据需要2 - 3小时,如果失败了,需要重新拉取。重新拉取需要2 - 3小时,后置任务都需要等待无法执行,集群资源空闲,当账单表任务完成后,所有的后置任务都启动了,扎堆运行,会导致资源负载非常高。账单表任务的延迟还会导致当天的任务都会延迟2~3小时,相关报表延迟展示,所有的分析、运营人员需要的数据也会延迟,相关分析人员可能要到下午才能开始正常分析、挖掘。
我们一年的KPI失败次数是12次,而5月份,已经失败了6次,每次延迟或者失败,都会有凌晨4点的告警电话,值班人员起床手动重跑。

image.png

为了提高核心宽表稳定性、降低耗时、提高集群资源利用率,需要对原来的方案进行优化。

方案

目标

利用实时计算实现数据实时同步,降低离线采集时间,提高任务稳定性,从而提高集群利用率。用小吃多餐的方式,分担离线一次性导入数据的性能瓶颈,该方案只适用于T+1增量采集的数据表。

数据流

image.png

简单例子

image.png

具体实现

mysql表

比如mysql表是以下5个字段的数据

Findex Forder_id Fversion Fcreate_time Fmodify_time
17106 O20201208505953012260 2 20-12-08 15:10:07 2020-12-08 15:12:25

kafka数据

{
    "headers":{
        "data_size":"254",
        "log_file_offset":"813159285",
        "database_name":"fund_credit_trade_each_26_db",
        "pipeline_time_consuming":"204",
        "log_file_name":"binlog.000040",
        "db_move_version":"0",
        "table_name":"t_trade_order_amount_0",
        "start_time":"2020-12-08 15:12:00",
        "event_execute_time":"1607411545000",
        "hash_code":"5",
        "record_oper_type":"U",
        "exec_time":"2020-12-08 15:12:25",
        "data_table_id":"480",
        "monitor_topic":"C8_JG_OTTER_FundcredittradeDB_fund_credit_trade_each_--_db.t_trade_order_amount_-_topic",
        "murmur_hash_code":"1949271285",
        "pipeline_time_consuming_without_canal":"4",
        "schema_table_name":"fund_credit_trade_each_--_db.t_trade_order_amount_-"
    },
    "body":{
        "Findex":"17106",
        "Forder_id":"O20201208505953012260",
        "Fversion":"2",
        "Fcreate_time":"2020-12-08 15:10:07",
        "Fmodify_time":"2020-12-08 15:12:25"
    }
}

spark表

相比原来的离线采集的方式,实时采集的数据是binlog流水日志,spark表不支持update操作,为了数据的最终一致性,需要增加一些逻辑字段,方便对数据进行去重操作。除了保留DB表的所有字段,还会增加以下字段。

字段名 对应字段 描述
fetl_time 数据的写入spark表时间
fencrypt_version 脱敏数据版本
f_etl_offset kafka 中数据的offset
fetl_version SDK写入的版本号
f_etl_db_name headers 中的database_name 数据从属的库名
f_etl_table_name headers 中的table_name 数据从属的表名
f_etl_op_type headers 中的record_oper_type 操作类型 string,值: “U”,“I”,“D”,分别代表 update,insert,delete
fetl_event_execute_time headers 中的event_execute_time binlog event时间
fetl_db_move_version headers 中的db_move_version 重置位点次数/位点版本
fetl_binlog_name headers 中的log_file_name,去掉前面7个字符 binlog文件ID
fetl_binlog_position headers 中的log_file_offset binlog位点

spark表去重逻辑

(
    select o.*,
        row_number() over(
            partition by f_etl_db_name,
            f_etl_table_name,
            fid (相关主键,可以是orderid,findex...)
            order by 
                fetl_binlog_name desc,
                fetl_binlog_position desc,
                    case
                    when f_etl_op_type = 'D' then 1
                    when f_etl_op_type = 'U' then 2
                    when f_etl_op_type = 'I' then 3
                    else 4
                end
        ) as frank
    from rt_ods.dspostloandb_postloan_xx_db_t_user_repay_info_y o
    WHERE f_p_date = '2020-12-07'
) t
where frank = 1
    and f_etl_op_type != 'D'

image.png

实时采集主要遇到的问题

使用要求

表名生成规则

instanceName.toLowerCase()+"_"+dbName.replace("#", "").toLowerCase()+"_"+tableName.replace("#", "").toLowerCase()

数据吞吐量

数据准确性

数据可用性

spark表数据去重

每一条数据都有唯一键,唯一键就是标识是唯一一条数据的,加上我们增加的附加字段组合作为去重的逻辑。可以完美解决解决的是数据重复问题。

效果

当天数据已经同步好,不在需要离线同步的时间,只需要将数据按规则去重后,便可以使用,减少了数据导入的时间,大大提高集群利用率以及任务完成率。


image.png

运营

image.png image.png

总结

实时同步利用了大数据解决方案的架构设计的解决思路、设计思想 -- 分而治之。
其实生活中这样的例子无处不在,例如让你扛一袋沙子,我一次扛不动,那么我拿个小桶,分开一桶一桶的搬,这其实就是分而治之的思路。

海量数据处理的分而治之

所谓海量数据处理,其实很简单,海量,海量,何谓海量,就是数据量太大,所以导致要么是无法在较短时间内迅速解决,要么是数据太大,导致无法一次性装入内存。
那解决办法呢?针对空间,无非就一个办法:大而化小:分而治之/hash映射,就把规模大化为规模小的。

大数据框架设计的分而治之

分布式存储HDFS中,采用将大文件切片的方式,比如一个1G的文件,切分成8个128MB的文件,在分布式计算框架中,不管是MR引擎,还是Spark计算引擎,利用的都是这个原理,大文件处理慢,就切分成多个小文件,各个处理,提高并行度。
Kafka中的topic拥有多个partition也是这种思想,通过提高parition数量提升吞吐量。
分而治之还有JDK7中的Fork/Join框架,是一个并行执行任务的框架。原理:把大任务割成若干小任务,最终汇总小任务的结果得到大任务结果的框架,可以理解成一个单机版的MR/Spark计算框架。

image.png

架构设计的分而治之

image.png

利用分而治之的思想,在实时同步的架构中实现。从上图可见,我们将数据读取T+1修改为从mysql binlog的方式去获取数据,采用玄武进行binlog日志的获取以及解析。这其实也是一个拆分的思想,从原先的数据拉到客户端机器,一次性处理,拆分成数据端-DB-玄武-Kafka,将一整天的数据,拆分到每分每秒处理,分而治之。过程虽然变长了,但是实时性跟稳定性大幅度提升。
在理想情况下,我们在不清楚业务系统需求的时候,设计出来的架构跟具体业务系统的架构是不吻合的。当业务系统无法提供你所要的需求的时候,从不同的层面去思考,将业务进行拆分,或许会给你不一样的解决方案。

参考链接:
https://zhuanlan.zhihu.com/p/55869671
https://baijiahao.baidu.com/s?id=1649619201425234676&wfr=spider&for=pc

上一篇下一篇

猜你喜欢

热点阅读