基于flink sql构建实时数据仓库
1、需求背景
根据目前大数据这一块的发展,已经不局限于离线的分析,挖掘数据潜在的价值,数据的时效性最近几年变得刚需,实时处理的框架有storm,spark-streaming,flink等。想要做到实时数据这个方案可行,需要考虑以下几点:1、状态机制 2、精确一次语义 3、高吞吐量 4、可弹性伸缩的应用 5、容错机制,刚好这几点,flink都完美的实现了,并且支持flink sql高级API,减少了开发成本,可用实现快速迭代,易维护等优点。
2、离线数仓和实时数仓对比
离线数仓的架构图:
离线数仓的架构图.png
实时数仓架构图:
实时数仓架构图.png
差异点 | 离线 | 实时 |
---|---|---|
时效性 | 目前ods层是小时级数据,其余都是天级 | 要求时延达到毫秒级别 |
复杂度 | 基于kimball严格分层,将公共计算逻辑下沉 | 简化分层架构 |
存储位置 | 基于hive,主要存储于HDFS | kafka,hbase/redis |
框架选用 | kafka,camus,hive | kafka,flink,hbase |
业务需求 | 需要支持上层应用分析,报表需求,推荐接口等 | 实时数据分析,实时指标,实时风控等 |
3、实时数仓的架构详细介绍
3.1、横向划分介绍(层级划分)
3.1.1、数据接入(source)
- 流量日志
流量数据天然就是流式的,具有实时性,主要是如何采集的问题。流量数据对应着流量数据域,binlog往往是其它数据域,比如交易域,营销域等,以流量日志为例,打点日志上报到nginx服务器,使用flume进行数据采集,sink进kafka,目前kafka只保留最近三天的数据,考虑到流量日志的数据量大,并且也没有保留多天的意义,保留三天,可以预留重刷历史数据的机会,如果是要查看多天以前的数据情况,完全可以用离线的。所以整套实时数仓体系建设都是为了保障近一天的数据分析。 - 关系型数据库
目前大多数互联网公司使用的生产数据库都是mysql,以mysql为例,mysql发生任意变更都会产生binlog,使用开源的canal解释原生二进制的数据,然后将解释完的数据sink进kafka,在将kafka作为flink source进行消费。这里值得一提的是,关系型数据库的数据往往是用于构建除流量域以外的数据域使用的。
3.1.2、数据计算(transform)
-
ods层
该成属于明细层,主要担任的职责是对数据进行清洗,解析,规范化处理,拿流量日志来说,使用flink sql对接kafka,使用自定义的udtf函数解析kafka当中的原始log,产生结构化数据,并且在次写入kafka的另一个topic当中,这就是我们的实时ods层数据了。当然关系型数据库相对来得简单一点,因为我们已经使用canal解析好了,直接使用flink sql读取kafka源就好了。 -
隐藏的校验层
为了校验实时数据的准确性(不管是流量数据还是binlog数据),还需要将存于kafka的ods层数据,写入hdfs上,使用hive和hdfs的文件进行映射,产生实时的hive表(目前是小时级别),该hive表可用于和离线hive表进行数据校正。 -
dwd层
dwd层的数据是从ods层读取,然后根据需求进行逻辑处理,包括关联相应的维度表(维度表的建设后续会提及),即进行降维操作。 -
DM层
DM层存储的是一些按照一定粒度进行聚合的值,我们之所以选择将DM层的数据存于hbase,原因在于hbase集群可扩展,支持巨量数据,并且根据rowkey查找,性能也很感人(前提是使用得当),最关键的是hbase的rowkey是天然唯一的,刚好符合聚合的模式,我们只需要将聚合的字段作为rowkey的因子,在用MD5加密一下,就是rowkey了。 -
APP/RPT层
该层对应着离线的应用层/报表层,这一层跟业务是紧密耦合的,但是这一层产出的数据来源离不开我们底层的建设。
3.1.3、数据存储(sink)
目前是将实时维度表和DM层数据存于hbase当中,实时公共层都存于kafka当中,并且以写滚动日志的方式写入HDFS(主要是用于校验数据)。其实在这里可以做的工作还有很多,kafka集群,flink集群,hbase集群相互独立,这对整个实时数据仓库的稳定性带来一定的挑战。
3.2、纵向划分介绍(数据域划分)
一个数据仓库想要成体系,成资产,离不开数据域的划分。所以参考着离线的数据仓库,想着在实时数仓做出这方面的探索,理论上来讲,离线可以实现的,实时也是可以实现的。 并且目前已经取得了成效,目前划分的数据域跟离线大致相同,有流量域,交易域,营销域等等。当然这里面涉及到维表,多事务事实表,累计快照表,周期性快照表的设计,开发,到落地这里就不详述了。
3.3、实时维度表介绍
维度表也是整个实时数据仓库不可或缺的部分。从目前整个实时数仓的建设来看,维度表有着数据量大,但是变更少的特点,我们试想过构建全平台的实时商品维度表或者是实时会员维度表,但是这类维度表太过于复杂,所以针对这类维度表下面介绍。还有另外一种就是较为简单的维度表,这类维度可能对应着业务系统单个mysql表,或者只需要几个表进行简单ETL就可以产出的表,这类维表是可以做成实时的。以下有几个实施的关键点:
- mysql是天然的实时维度表,可惜不能用?
在流式程序里面,维度表跟流进行join,面临着很高的QPS,或者可以理解为长连接,如果直接用flink去读取mysql作为维表,大概率是会挂掉的,对生产系统的稳定性有很大影响。 - 实时维表存于hbase当中
既然mysql行不通,只能另寻他方,如果hbase里面能维护一份跟mysql实时同步的维表,那么问题应该是可以解决的,因为hbase跟生产系统无关。那么如何实现实时同步呢?我们做法是先全量同步一份数据到hbase当中,(注意rowkey的设计,这个因子一定是用于维表关联用的),然后将该mysql表的binlog日志使用canal解析完落到kafka,使用flink去消费kafka的数据,然后将新增和变更的数据实时同步到hbase,这样就实现了mysql和hbase的数据实时同步。
4.实时数仓难点讨论
4.1 如何保证接入数据的准确性
如下是离线数据同步架构图:
离线数据同步架构图.png
4.1.1实时和离线数据接入的差异性
实时数据的接入其实在底层架构是一样的,就是从kafka那边开始不一样,实时用flink的UDTF进行解析,而离线是定时(目前是小时级)用camus拉到HDFS,然后定时load HDFS的数据到hive表里面去,这样来实现离线数据的接入。实时数据的接入是用flink解析kafka的数据,然后在次写入kafka当中去。
4.1.2如何建立实时数据和离线数据的可比较性
由于目前离线数据已经稳定运行了很久,所以实时接入数据的校验可以对比离线数据,但是离线数据是小时级的hive数据,实时数据存于kafka当中,直接比较不了,所以做了相关处理,将kafka的数据使用flink写HDFS滚动日志的形式写入HDFS,然后建立hive表小时级定时去load HDFS中的文件,以此来获取实时数据。
4.1.3如何确定比较的时间区间
完成以上两点,剩余还需要考虑一点,都是小时级的任务,这个时间卡点使用什么字段呢?首先要确定一点就是离线和实时任务卡点的时间字段必须是一致的,不然肯定会出问题。目前离线使用camus从kafka将数据拉到HDFS上,小时级任务,使用nginx_ts这个时间字段来卡点,这个字段是上报到nginx服务器上记录的时间点。而实时的数据接入是使用flink消费kafka的数据,在以滚动日志的形式写入HDFS的,然后在建立hive表load HDFS文件获取数据,虽然这个hive也是天/小时二级分区,但是离线的表是根据nginx_ts来卡点分区,但是实时的hive表是根据任务启动去load文件的时间点去区分的分区,这是有区别的,直接筛选分区和离线的数据进行对比,会存在部分差异,应当的做法是筛选范围分区,然后在筛选nginx_ts的区间,这样在跟离线做对比才是合理的。
4.2如何保证接入数据的时延
目前实时数据接入层的主要时延是在UDTF函数解析上,实时的UDTF函数是根据上报的日志格式进行开发的,可以完成日志的解析功能。
解析流程图如下:
日志解析流程.png
解析速率图如下:
解析速率图.png
该图还不是在峰值数据量的时候截的,目前以800记录/second为准,大概一个记录的解析速率为1.25ms。
目前该任务的flink资源配置核心数为1,假设解析速率为1.25ms一条记录,那么峰值只能处理800条/second,如果数据接入速率超过该值就需要增加核心数,保证解析速率。
4.3 维度表设计成实时的复杂度过高
4.3.1实时维表背景介绍
介绍一下目前离线维度表的情况,就拿商品维度表来说,全线记录数将近一个亿,计算逻辑来自40-50个ods层的数据表,计算逻辑相当复杂,如果实时维度表也参考离线维度表来完成的话,那么开发成本和维护成本非常大,对于技术来讲也是很大的一个挑战,并且目前也没有需求要求维度属性百分百准确。所以目前(伪实时维度表)准备在当天24点产出,当天的维度表给第二天实时公共层使用,即T-1的模式。伪实时维度表的计算逻辑参考离线维度表,但是为了保障在24点之前产出,需要简化一下离线计算逻辑,并且去除一些不常用的字段,保障伪实时维度表可以较快产出。
实时维度表的计算流程图:
实时维度表的计算流程图.png
4.3.2在实施的过程当中的细节点
-
根据实时维度表需要的属性字段对离线维度表进行简化操作,并且裁剪ods层的计算逻辑,理顺实时维度表的计算逻辑。
-
实时维度表使用到的stage和ods层数据表保存周期都不需要太长,一般保存数天就好。
-
由于实时维度表需要在24点之前产出并写入到hbase当中,所以要考虑将任务定于几点开始跑,比如所有抽取任务和ods计算任务都从23点开始跑,当然要看具体任务耗时来定,如果耗时过长需要在提前一点。
-
根据以上步骤去完成,感觉剩下来只要将数据写入hbase就好了,但是这里也有一个巨坑。如果将rowkey设计成md5(pt+维度表主键),然后hbase保存近两天的数据,这样当实时数据出现问题,我们还可以进行重刷数据。但是我们不管是商品维度表还是用户维度表都达到了数千万的级别,如果每天全量写入hbase的话,我们做了压测计算hbase的写入速率,大概400百万条/10min,如果同步以一亿条记录的话,大概就需要250分钟,对于时效要求这么高的实时维度表,这个时间肯定是接收不了的,所以row的设计不能将pt放入,但是这样的话就无法保存历史数据,如果实时数据发生异常,重刷数据时部分实时公共层关联的维度信息是不准确的,所以我们在这点上做了取舍,放弃重刷数据,毕竟出现数据异常的概率很小,就算出现了,关联的维度信息不准确的部分也很少(维度信息每天只会有部分发生变化,可能不到百分之一)。既然这种全量走不通,就要考虑增量同步,如果区分该条记录是否发生了属性变化,我们采用的是将全字段做md5处理,只要任一一个字段发生变化,md5就会发生变化,在使用一个flag字段来做标识,flag的计算逻辑就是拿当天的md5和昨天的md5进行比较,相同为0(表示未变化),不同为1(表示发生变化),到时候我们只将flag=1的数据同步到hbase就好了,rowkey设计为md5(维度表主键),这样每天只会把变化差异维度记录同步到hbase,大概每天有几百万,这样的同步时间是可以接受的。其实这里还有一个小点没有考虑到,实时维度表假设是在23:50产出,那么23:50到24:00使用的就是最新的实时维度表了,而不是昨天的实时维度表,这也是存在部分差异的点,但是从目前这个情况考虑,暂时需要做一些取舍。
4.4 用flink进行窗口计算,窗口过大,内存问题
4.4.1、背景介绍
目前使用flink作为公司主流的实时计算引擎,使用内存作为状态后端,并且固定30s的间隔做checkpoint,使用HDFS作为checkpoint的存储组件。并且checkpoint也是作为任务restart以后恢复状态的重要依据。熟悉flink的人应该晓得,使用内存作为状态后端,这个内存是JVM的堆内存,毕竟是有限的东西,使用不得当,OOM是常有的事情,下面就介绍一下针对有限的内存,如果完成常规的计算。
- 滑动窗口 or 滚动窗口?
其实只是从内存使用的角度来讲,滑动窗口和滚动窗口都是可取的,关键看如何使用。首先要了解你的每个task产出的数据存于内存是个什么形式,比如你只是计算pv这种指标,哪怕你是使用窗口大小为24hours的滑动窗口也是可取的,因为这个状态在内存当中只是一个聚合值,不怎么占内存,当然如果多维聚合,条数特别多也是另当别论。举另外一个极端的例子,如果是流量数据,直接做map操作,这时候哪怕你是使用10min的滚动窗口,内存可能就吃不消了(具体要看每个flink任务分配多少内存和核数),因为流量数据非常大,而且在内存当中是以明细的形式存在,这时候就会非常占用内存。 - 数据量大,时间跨度长,需要聚合数据
这种情况好说,直接上滑动窗口,窗口大小可以大一点,因为状态不怎么占内存(多维度聚合值条数可能也很大,需要具体判断)。 - 数据量大,时间跨度长,并且需要明细数据
在实际应用当中,这情况很常见,如果只是使用上述的两个窗口,你会发现,好像都不能很好应对这种情况,为了应对这种情况,一般使用较小窗口大小的滚动窗口,比如10s的滚动窗口,然后将这个窗口计算完的值存于持久化存储hbase当中,然后在进行下一个10s窗口的计算,那么这样状态也不会丢失,flink内存也只需要存储10s的状态就好。 - 数据量大,时间跨度长,需要去重累计
这种情况应该是流式计算里面最麻烦的一种情况了,但是确实又存在,比如计算天级的uv(按userid进行去重得到的指标),这种情况,不能将状态直接聚合累计,像上述的2描述的一样,因为它需要去重,因为要去重就要维护着整个时间跨度内的明细数据,但是这样又非常占用内存,看似非常矛盾的一件事情,该如何去解决。
其实有多种方法,使用Hbase或者Redis都可以实现,这里以hbase为例,比如现在需要按照stat_date(日期)维度计算uv指标,那么可以将MD5(start_date+userid)作为rowkey插入到hbase当中,那么如果是同样的start_date+userid记录插入到hbase当中,记录数是不会增加的,因为rowkey一定是全局唯一,这样就实现了去重,那么如何实现累计呢,累计其实就是将hbase里面符合的条数取出,自己写一个小方法,思路大概就是这样。Redis其实是一样的原理,这里就不多做解释了。