Flink 维表Join/双流Join 方法总结
一、背景
事实表通常存储在kafka中,维表通常存储在外部设备中(比如MySQL,HBase)。对于每条流式数据,可以关联一个外部维表数据源,为实时计算提供数据关联查询。维表可能是会不断变化的,在维表JOIN时,需指明这条记录关联维表快照的时刻。需要注意是,目前Flink SQL的维表JOIN仅支持对当前时刻维表快照的关联(处理时间语义),而不支持事实表rowtime所对应的的维表快照。引
二、维表Join
- 预加载维表
将维表全量预加载到内存里去做关联,具体的实现方式就是我们定义一个类,去实现 RichFlatMapFunction,然后在 open 函数中读取维度数据库,再将数据全量的加载到内存,然后在 probe 流上使用算子 ,运行时与内存维度数据做关联。
这个方案的优点就是实现起来比较简单,缺点也比较明显,因为我们要把每个维度数据都加载到内存里面,所以它只支持少量的维度数据。同时如果我们要去更新维表的话,还需要重启作业,所以它在维度数据的更新方面代价是有点高的,而且会造成一段时间的延迟。对于预加载维表来说,它适用的场景就是小维表,变更频率诉求不是很高,且对于变更的及时性的要求也比较低的这种场景。
改进:open()新建一个线程定时加载维表,这样就不需要人工的去重启 Job 来让维度数据做更新,可以实现一个周期性的维度数据的更新。
- distributed cache
通过 Distributed cash 的机制去分发本地的维度文件到 Task Manager 后再加载到内存做关联。实现方式可以分为三步:
- 通过 env.registerCached 注册文件。
- 实现 RichFunction,在 open 函数里面通过 RuntimeContext 来获取 Cache 文件。
- 解析和使用这部分文件数据。
因为数据要加载到内存中,所以支持的数据量比较小。而且如果维度数据需要更新,也是需要重启作业的。
那么它适用的场景就是维度数据是文件形式的、数据量比较小、并且更新的频率也比较低的一些场景,比如说我们读一个静态的码表、配置文件等等。
-
热存储关联
Image_20210907174514.png
把维度数据导入到像 Redis、Tair、HBase 这样的一些热存储中,然后通过异步 IO 去查询,并且叠加使用 Cache 机制,还可以加一些淘汰的机制,最后将维度数据缓存在内存里,来减轻整体对热存储的访问压力。
如上图展示的这样的一个流程。在 Cache 这块的话,比较推荐谷歌的 Guava Cache,它封装了一些关于 Cache 的一些异步的交互,还有 Cache 淘汰的一些机制,用起来是比较方便的。
异步 IO 可以并行发出多个请求,整个吞吐是比较高的,延迟会相对低很多。如果使用异步 IO 的话,它对于外部存储的吞吐量上升以后,会使得外部存储有比较大的压力,有时也会成为我们整个数据处理上延迟的瓶颈。所以引入 Cache 机制是希望通过 Cache 来去减少我们对外部存储的访问量。
这个方案的优点就是维度数据不用全量加载到内存中,不受限于内存大小。
但是需要依赖热存储资源,再加上cache过期时间,所以最后结果会有一定的延迟。
适用于维度数据量比较大,能接受维度更新有一定延迟的情况。
- 广播维表
利用 Broadcast State 将维度数据流广播到下游 Task 做 Join。
- 将维度数据发送到 Kafka 作为广播原始流 S1
- 定义状态描述符 MapStateDescriptor。调用 S1.broadcast(),获得 broadCastStream S2
- 调用非广播流 S3.connect(S2),得到 BroadcastConnectedStream S4
- 在 KeyedBroadcastProcessFunction/BroadcastProcessFunction 实现关联处理逻辑,并作为参数调用 S4.process()
广播维表虚啊维度的变更可以及时的更新到结果,但是数据还是需要保存在内存中,因为它是存在 State 里的,所以支持维表数据量仍然不是很大。适用的场景就是我们需要时时的去感知维度的变更,且维度数据又可以转化为实时流。
- Temporal table function join
首先说明一下什么是 Temporal table?它其实是一个概念:就是能够返回持续变化表的某一时刻数据内容的视图,持续变化表也就是 Changingtable,可以是一个实时的 Changelog 的数据,也可以是放在外部存储上的一个物化的维表。
它的实现是通过 UDTF 去做 probe 流和 Temporal table 的 join,称之 Temporal table function join。这种 Join 的方式,它适用的场景是维度数据为 Changelog 流的形式,而且我们有需要按时间版本去关联的诉求。
- 在 Changelog 流上面去定义 TemporalTableFunction,这里面有两个关键的参数是必要的。第1个参数就是能够帮我们去识别版本信息的一个 Time attribute,第 2 个参数是需要去做关联的组件。
- 在 tableEnv 里面去注册 TemporalTableFunction 的名字。
维表Join方案对比
Image_20210907181322.png
三、双流Join
批处理有两种方式处理两个表的Join,一种是基于排序的Sort-Merge Join,更一种是转化为Hash Table 加载到内存里做Hash Join。
在双流Join的场景中,Join的对象是两个流,数据是不断进入的,所以我们Join的结果也是需要持续更新的。基本思路是将一个无线的数据流,尽可能拆分成有限数据集去做Join。
-
Regular Join
这种 Join 方式需要去保留两个流的状态,持续性地保留并且不会去做清除。两边的数据对于对方的流都是所有可见的,所以数据就需要持续性的存在 State 里面,那么 State 又不能存的过大,因此这个场景的只适合有界数据流。 -
Interval Join
加入了一个时间窗口的限定,要求在两个流做 Join 的时候,其中一个流必须落在另一个流的时间戳的一定时间范围内,并且它们的 Join key 相同才能够完成 Join。加入了时间窗口的限定,就使得我们可以对超出时间范围的数据做一个清理,这样的话就不需要去保留全量的 State。
Interval Join 是同时支持 processing time 和 even time去定义时间的。如果使用的是 processing time,Flink 内部会使用系统时间去划分窗口,并且去做相关的 state 清理。如果使用 even time 就会利用 Watermark 的机制去划分窗口,并且做 State 清理。
- Window join
将两个流中有相同 key 和处在相同 window 里的元素去做 Join。它的执行的逻辑比较像 Inner Join,必须同时满足 Join key 相同,而且在同一个 Window 里元素才能够在最终结果中输出。