Spark_Flink_HadoopflinkFlink

聊聊Flink异步I/O机制的原理

2019-11-22  本文已影响0人  LittleMagic

在不久前的这篇文章中,提出了一种用Flink做流处理时join外部维度数据的简单方法。但是它的适用情境毕竟有限,通用的方法则是从Flink 1.2版本引入的异步I/O(Async I/O)机制。

异步I/O专门用来解决Flink计算过程中与外部系统的交互问题。在默认情况下,算子向外部系统发出请求后即阻塞,等待结果返回才能发送下一个请求,可能会造成较大的延迟,吞吐量下降。有了异步I/O之后,就可以并发地发出请求和接收响应,延迟大大降低。下图来自官方文档,一看便知。

要享受异步I/O带来的便利,前提就是我们有一个能异步请求外部系统的客户端。如果原生没有提供,就得自己创建有限大小的线程池,将客户端放到线程池里调用。

异步I/O的原始设计文档见FLIP-12。但是随着时间的推移,它里面的内容与目前最新的Flink 1.9版本的实现有了一定出入,所以就不参考它了,直接讲讲笔者读过1.9版本的相关源码之后总结出的东西吧。

在调用AsyncDataStream.orderedWait()/unorderedWait()方法时,本质上是产生了一个AsyncWaitOperator算子,它是异步I/O的核心。每个AsyncWaitOperator都由三个主要的部分组成。

所谓StreamElementQueueEntry就是StreamElement(Flink基础概念,可以是流中的一条数据,或是一个水印等)的简单封装,通过j.u.c.CompletableFuture实现异步返回。CompletableFuture是JDK 8提供的新特性,可以认为是非常好用的Future改进版,这里就不再展开讲了。

以下是以StreamElementQueueEntry为中心展开的类图。看官会注意到它有两种实现:代表数据的StreamRecordQueueEntry,和代表水印的WatermarkQueueEntry。它们都持有CompletableFuture。

AsyncWaitOperator的机制可以用下面的简图来表示。

  1. 来自上游的StreamElement进入AsyncWaitOperator的StreamElementQueue,并被封装成StreamElementQueueEntry。
  2. AsyncWaitOperator调用传入的AsyncFunction的asyncInvoke()方法,该方法异步地与外部系统交互。
  3. 异步操作完成后,由asyncInvoke()方法显式地调用ResultFuture.complete()方法,将结果返回;或者调用completeExceptionally()方法表示出现了异常。ResultFuture就是CompletableFuture的代理接口。
  4. Emitter线程从StreamElementQueue中拉取那些已经完成了的StreamElementQueueEntry,并输出到下游算子。

以上的分析说明了AsyncWaitOperator的工作流程,但是没有考虑输出流的顺序性。实际上会有以下两种情况:

简单讨论一下。

异步I/O的检查点做起来很容易。由上面的分析可以知道,StreamElementQueue保存的就是尚未完成异步请求的元素,以及已完成异步请求但还没有送到Emitter发送的元素,只要遍历该队列,并将它们都放入状态后端就OK。

Happy Friday night,晚安。

上一篇下一篇

猜你喜欢

热点阅读