flink Async I/O

2019-08-01  本文已影响0人  邵红晓

Hbase端代码示例

import java.util.concurrent.{ExecutorService, Executors}

import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{Get, Table}
import org.apache.hadoop.hbase.util.Bytes

class AsyncHbaseClient extends AsyncFunction[(String, Int), (String, Int, String)] {

  lazy val  table: Table = HbaseUtil().connect.getTable(TableName.valueOf("test"))
  lazy val executorService: ExecutorService = Executors.newFixedThreadPool(30)

  override def asyncInvoke(input: (String, Int), resultFuture: ResultFuture[(String, Int, String)]): Unit = {
    executorService.submit(new Runnable {
      override def run(): Unit = {
        val get = new Get(Bytes.toBytes(input._1))
        get.addColumn(Bytes.toBytes("a"), Bytes.toBytes("key11"))
        val result = table.get(get)
        val value = Bytes.toString(result.getValue(Bytes.toBytes("a"), Bytes.toBytes("key11")))
        // 一定要记得放回 resultFuture,不然数据全部是timeout 的
        resultFuture.complete(Iterable((input._1, input._2, value)))
      }
    })
  }

  override def timeout(input: (String, Int), resultFuture: ResultFuture[(String, Int, String)]): Unit = {
    resultFuture.complete(Iterable((input._1, input._2, "timeout")))
  }
}

主函数调用示例

 def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    import org.apache.flink.api.scala._
    val list = ArrayBuffer[String]()
    list.append("my-key-1")
    list.append("my-key-1")
    list.append("my-key-1")
    list.append("my-key-1")
    val text = env.fromCollection(list).map((_, 1))
    // 添加一个 async I/O 的转换
    val resultStream: DataStream[(String, Int,String)] = AsyncDataStream.orderedWait[(String, Int),(String, Int,String)](
      text,new AsyncHbaseClient(),1000L,TimeUnit.MILLISECONDS,10
    )
    resultStream.print()
    env.execute("Streaming WordCount")
  }
上一篇 下一篇

猜你喜欢

热点阅读