Flink 异步I/O

2019-08-14  本文已影响0人  k_wzzc

1.为什么需要异步IO

flink在做实时处理时,有时候需要和外部数据交互,但是通常情况下这个交互过程是同步的,这样就会产生大量的等待时间;而异步操作可以在单个函数实例中同时处理多个请求,并且同时接收相应。这样等待时间就平均分摊到了多个请求上,大大减少了请求的等待时长,可以提高实时处理的吞吐量。


flink异步io.png

2.使用flink异步IO的先决条件

3. flink异步IO的使用步骤

4. 使用示例

import scala.concurrent._
import ExecutionContext.Implicits.global

/**
  * 使用scala并发包的Future模拟一个异步客户端
  */
class DatabaseClient {
  def query: Future[Long] = Future {
    System.currentTimeMillis() / 1000
  }
}

/** 'AsyncFunction' 的一个实现,向数据库发送异步请求并设置回调 
  * 改编自官网实例
  */

class AsyncDatabaseRequest extends AsyncFunction[Int, (Int, Long)] {

  /** The database specific client that can issue concurrent requests with callbacks */
  lazy val client: DatabaseClient = new DatabaseClient

  /** The context used for the future callbacks */
  implicit lazy val executor: ExecutionContext =
    ExecutionContext.fromExecutor(Executors.directExecutor())

  override def asyncInvoke(str: Int,
                           resultFuture: ResultFuture[(Int, Long)]): Unit = {

    // issue the asynchronous request, receive a future for the result
    val resultFutureRequested: Future[Long] = client.query

    // set the callback to be executed once the request by the client is complete
    // the callback simply forwards the result to the result future
    resultFutureRequested.onSuccess {
      case result: Long => resultFuture.complete(Iterable((str, result)))
    }
  }
}

object AsynchronousIOTest {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val data: immutable.Seq[Int] = Range(1, 10)

    // 创建数据流
    val dataStream: DataStream[Int] = env.fromCollection(data)

     // 使用异步IO
     val asyn = AsyncDataStream.unorderedWait(
      dataStream,//执行异步操作的DataStream
      new AsyncDatabaseRequest,//
      1000, TimeUnit.MILLISECONDS, //超时时间
      100 // 进行中的异步请求的最大数量
    )

    asyn.print()

    env.execute("AsynchronousIOTest")

  }

}
结果顺序

AsyncDataStream 有两个静态方法,orderedWait 和 unorderedWait,对应了两种输出模式:有序和无序。

超时处理

当异步IO请求超时时,默认情况下会引发异常并重新启动作业。如果要处理超时,可以重写AsyncFunction#timeout方法。

  override def timeout(input: Int,
                       resultFuture: ResultFuture[(Int, Long)]): Unit =
    super.timeout(input, resultFuture)

容错保证

异步IO运算符提供完全一次的容错保证。它在检查点中存储正在进行的异步请求的记录,并在从故障中恢复时恢复/重新触发请求。

其他资料

关于flink异步IO的更多信息可以参考flink官网或者Flink 原理与实现:Aysnc I/O这篇文章。

Futures和事件驱动编程的知识可以参考《AKKA入门与实践》这本书第二章的内容:Actor与并发。


参考资料:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/asyncio.html
http://wuchong.me/blog/2017/05/17/flink-internals-async-io/
《AKKA入门与实践》

上一篇下一篇

猜你喜欢

热点阅读