聊聊Flink异步I/O机制的原理
在不久前的这篇文章中,提出了一种用Flink做流处理时join外部维度数据的简单方法。但是它的适用情境毕竟有限,通用的方法则是从Flink 1.2版本引入的异步I/O(Async I/O)机制。
异步I/O专门用来解决Flink计算过程中与外部系统的交互问题。在默认情况下,算子向外部系统发出请求后即阻塞,等待结果返回才能发送下一个请求,可能会造成较大的延迟,吞吐量下降。有了异步I/O之后,就可以并发地发出请求和接收响应,延迟大大降低。下图来自官方文档,一看便知。
要享受异步I/O带来的便利,前提就是我们有一个能异步请求外部系统的客户端。如果原生没有提供,就得自己创建有限大小的线程池,将客户端放到线程池里调用。
异步I/O的原始设计文档见FLIP-12。但是随着时间的推移,它里面的内容与目前最新的Flink 1.9版本的实现有了一定出入,所以就不参考它了,直接讲讲笔者读过1.9版本的相关源码之后总结出的东西吧。
在调用AsyncDataStream.orderedWait()/unorderedWait()方法时,本质上是产生了一个AsyncWaitOperator算子,它是异步I/O的核心。每个AsyncWaitOperator都由三个主要的部分组成。
- AsyncFunction:执行异步操作的函数,用户需要覆写其asyncInvoke()方法并传入。
- StreamElementQueue:包含StreamElementQueueEntry的队列,底层由ArrayDeque实现。
- Emitter:单独的守护线程,将异步调用完成后的结果发送给下游算子。
所谓StreamElementQueueEntry就是StreamElement(Flink基础概念,可以是流中的一条数据,或是一个水印等)的简单封装,通过j.u.c.CompletableFuture实现异步返回。CompletableFuture是JDK 8提供的新特性,可以认为是非常好用的Future改进版,这里就不再展开讲了。
以下是以StreamElementQueueEntry为中心展开的类图。看官会注意到它有两种实现:代表数据的StreamRecordQueueEntry,和代表水印的WatermarkQueueEntry。它们都持有CompletableFuture。
AsyncWaitOperator的机制可以用下面的简图来表示。
- 来自上游的StreamElement进入AsyncWaitOperator的StreamElementQueue,并被封装成StreamElementQueueEntry。
- AsyncWaitOperator调用传入的AsyncFunction的asyncInvoke()方法,该方法异步地与外部系统交互。
- 异步操作完成后,由asyncInvoke()方法显式地调用ResultFuture.complete()方法,将结果返回;或者调用completeExceptionally()方法表示出现了异常。ResultFuture就是CompletableFuture的代理接口。
- Emitter线程从StreamElementQueue中拉取那些已经完成了的StreamElementQueueEntry,并输出到下游算子。
以上的分析说明了AsyncWaitOperator的工作流程,但是没有考虑输出流的顺序性。实际上会有以下两种情况:
- 调用AsyncDataStream.orderedWait():创建OrderedStreamElementQueue队列,保持请求的顺序与输出结果的顺序相同,亦即先进先出。
- 调用AsyncDataStream.unorderedWait():创建UnorderedStreamElementQueue队列,不保持顺序。在采用处理时间时,先返回的结果先输出。而采用事件时间时,需要额外保证水印的边界不错乱。
简单讨论一下。
-
有序
有序是最简单的情况,只需要将元素按照到来的顺序放入OrderedStreamElementQueue。只有当队列中的队头请求异步操作返回了结果,才会触发Emitter输出,后面的请求先返回也只能等待。 -
无序(处理时间)
这种情况也不难办。在UnorderedStreamElementQueue中维护两个子队列,一个是未完成请求的队列(uncompletedQueue),一个是已完成请求的队列(completedQueue)。所有请求都先进入uncompletedQueue并执行异步操作,并按照操作完成的顺序进到completedQueue中。Emitter从completedQueue拉取并输出结果即可。如下图所示。
-
无序(事件时间)
这是比较复杂的情况:我们允许两个水印之间的元素乱序,但是水印不能乱。所以在使用两个队列的同时,uncompletedQueue中还必须存储水印,这就是上面的WatermarkQueueEntry的由来。在水印之间存储的也不再是单个StreamElementQueueEntry,而是它们的集合。只有当uncompletedQueue中的队头集合有元素的异步操作返回了,才能将其移动到completedQueue里面。这样就可以保证在通过某个水印之前,它前面的所有异步请求都完成。如下图所示。
异步I/O的检查点做起来很容易。由上面的分析可以知道,StreamElementQueue保存的就是尚未完成异步请求的元素,以及已完成异步请求但还没有送到Emitter发送的元素,只要遍历该队列,并将它们都放入状态后端就OK。
Happy Friday night,晚安。