java8 CompletableFuture Stupid A

2020-06-20  本文已影响0人  Queen〇fLaponia

  java8 CompletableFutureScala Future 相比,充斥了没所谓没必要的概念和名词,不过因为需要考察HBase2.2的异步客户端,所以需要了解基本用法,首先了解下CompletableFuture,之后记录一下HBase自带异步客户端的测试情况。

IBM Java并发性:阻塞还是不阻塞?

A Brief Overview Of Scala Futures

掘金:Java 和 Scala 中的 Future

直到java7:The only way to get the result from a future object is to call its get() method in the thread that submitted the task. This method is blocking;所以从java5推出一直到7的 Future 只能相当于个概念车吧,java8开始量产车

  java8的java.util.concurrent包低调加推:CompletionStage接口及其CompletableFuture 实现类,使得java Future真正具备实用性,一定程度上使得java向面向并发语言前进了一大步。

  java8围绕CompletableFuture 构建了CompletionStage patterns.  CompletionStage顾名思义是一串操作的各个阶段,CompletionStage就是在这些异步阶段之间传递参数和执行Task的模型,java8是以类似Akka Stream的形式构建异步函数执行Chain:

  1、CompletableFuture<T> cf1 =  CompletableFuture.supplyAsync(() -> {   () -> 生产操作; }, executor) //supplyAsync是静态方法,方便你直接开始一段异步调用链

  2、CompletableFuture<T> cf2 =  cf1.thenApply( (argument) -> 中间处理并返回结果 )

  3、CompletableFuture<T> cf3 =  cf2.thenAccept( (argument) -> 收尾处理 )

  4、还可以再加最后一步:cf3.thenRun( () -> 额外操作 )

  下游的CompletionStages由上游CompletionStages的完成事件自动触发执行,不需要你的main线程介入,上游返回的结果会fed 给下游,除了第一个supplyAsync方法接收参数类型为Supplier,上述3个CompletionStages参数的Task类型分别是Function、Consumer和Runnable.  这些类型由方法名暗示:方法名前缀是then的,其后缀暗示了Task类型(run for Runnable, accept for Consumer, and apply for Function)

  因为java8只有残缺的函数式特性和蹩脚的泛型,所以不像scala的Task就是一段表达式这么统一精炼,为了适配有无输入参数和返回值,不得不定义出Runnable、Consumer、Supplier和Function等几种Task类型,这些所谓的“函数式接口”注定是过渡性名词,到java14这些名词都会消弭无形。

  Consumer、Supplier本质都是函数而已,你可以借助Akka Stream的概念理解为:

  1、Supplier无参数只有返回,相当于无参函数,它只生产数据用于向后传递,类似Source;

  2、Consumer只有参数无返回,它接收参数数据(一般是Supplier生产的)做处理,无返回,类似Sink.

  3、Function有参数有返回。既然有上述二者,当然也会有位于中间、接收参数、做处理并返回的东西,即java.util.function.Function,类似Flow. 

  就是这么隐晦猥亵,就像就为了个返回值还要在Runnable之外再定义一个Callable,就为了有没有接收参数有没有返回值,他们竟然能总结出四种情况的“函数式接口”:

茴字的四种写法

  令人怀疑java8就为了匹配java.util.concurrent并发包,硬撸出来个java.util.function包,所以如果你要用java.util.concurrent写高并发程序、还必须用到java.util.function的一堆东西... 这就是面向对象难以为继的表现:名词泛滥,最终可能会达到一个无法收拾的地步,就像某种没有构词法的语言,每当有新事物出现,就得发明一个新名词指代它,这种龟毛语言能存在几年?除了不断发明新名词,连带着类内部的方法也不得不膨胀,他们自己也承认:

That makes a lot of methods in the CompletableFuture class!  Three types of tasks, times four types of chaining and composition, times three ways of specifying which ExecutorService we want this task to be run. We have 36 methods to chain tasks. This is probably one of the elements that make this class complex: the high number of available methods.

  java8开发者自己也觉得长此以往名词王国国将不国,终于开始支持() ->"Hello world"这种字面函数语法了,java.util.function包里的一坨东西,你可以眼不见心不烦了,上述的几个新名词你可以不管它们,在编码使用的时候:

  1、使用Supplier一般写作:supplyAsync( () ->"Hello world" )

  2、使用Consumer写作:thenAccept*( s -> println(s) ) //*表示Async //相当于to方法

  3、使用Function写作:thenApply*( s -> println(s); s+1 ) //*表示Async //相当于via方法

  4、then开头的方法命名方式是:run for Runnable, accept for Consumer, and apply for Function,所以除了上述thenAccept、thenApply还有一个:

  thenRun*( () ->Unit )   //*表示Async;() ->Unit表示Runnable,无参数无返回的无聊行为

  避免了继续写这种裹脚布一样的代码:

new Consumer[String]() {

      def accept(s: String): Unit = {

        System.out.println(s)

      }

}

  那么thenAccept和thenAcceptAsync、thenApply和thenApplyAsync、thenRun和thenRunAsync都有啥区别呢?简单说是:An async method executes its task in the default fork/join pool, unless it takes an Executor, in which case, the task will be executed in this Executor.  即:带async后缀的方法,如果传入一个新的Executor则提交该Executor运行,如果没有传入则会运行在一个通用fork/join线程池。java源码的注释更详细:

  Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

  All  async  methods without an explicit Executor argument are performed using the {@link ForkJoinPool#commonPool()} (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).  To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface {@link AsynchronousCompletionTask}

  OK,我们要尽量异步化执行所有任务的话,理论上应该尽量使用带Async后缀的方法,同时搭配规划线程池的使用,了解到这一层足矣了。不过通过HBase异步客户端的初步测试,带Async后缀的方法虽然单独运行在一个ForkJoinPool.commonPool,但是完成任务的时间还不如不带Async后缀的方法(注意这并不能说明性能不好),我的Source是HBase异步客户端AsyncTable异步表的putAll方法,它生产出来一个CompletableFuture<Void>,因为后续任务比较简单直接是thenAccept完事,这些不带Async后缀的方法将复用前一个方法线程池:HBase异步客户端的线程池Default-IPC-NioEventLoopGroup

  实际上这些性能都无所谓,单机硬件的并发度是有上限的,我们使用异步客户端也不能提升HBase真正落盘入库的效率,我们使用的目的主要是不要阻塞程序中的其他线程罢了,其次是将任务分阶段、这些阶段全异步执行之后,在大并发场景会带来总体的吞吐量效率提升,这一块提升肉眼可见,不过肯定不是特别大幅度的提升,所谓异步仍然是空间换时间,虽然异步方法都是无阻塞返回了,但是不代表数据在HBase真的这么快落盘了,只是待提交的数据仍然在内存中由某个线程代持。我的测试是对HBase表做putAll(不写WAL)、然后flush,过程中发现HBase表的putAll操作耗时是最突出的,所以只要将这一步异步化就能收到关键效果了,这一步是调用HBase异步表API,从这一步开始已经是异步化了,至于这些异步线程的真实效率并不是我们主要关心的,我们主要关心我们的main线程不会被IO阻塞,实际上对我的场景来说,HBase异步客户端的作用无足轻重,可用可不用,只是异步的AsyncAdmin和 AsyncTable线程安全不需要关闭、代码少点麻烦而已。

  更多的用法细节可以看java8 CompletableFuture,Scala Future另一个很好的参考文档是Akka Futures。总之,学院派的OO已经走到了尽头,异步编程的复杂性本来就是指数级增长,java8的异步编程基础已经这么繁杂冗长,用它去做大规模并发系统开发可以肯定几乎是不可能的事,对后续java版本也不会在这种薄弱的基础上继续,事实上,java14的语法已经全面靠近scala.(估计在java14,java.util.function包已经被废弃),不过回过头来看Scala的实现,本质也是一样的,比如function22,其实在JVM之上做法都类似,都只能做到让你眼不见心不烦。

  我的场景下,实际上用了三个不同类型的线程池:

  1、SI的task:executor:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor,只是基本的ThreadPoolExecutor,接收处理任务的源头;

  2、scala.concurrent.ExecutionContext.Implicits.global:有意义的ForkJoin池,异步代码主要结构基础;

  3、HBase异步客户端的线程池Default-IPC-NioEventLoopGroup,不用也不行,HBase异步客户端AsyncTable.putAll返回结果很简陋只是个CompletableFuture<Void>,推测仅仅是用了个Runnable去做的,这样的话在putAll当中有任何异常都不是很好处理,所以感觉HBase异步客户端并不是十分的完善。无意发现一个scala.concurrent.java8.FuturesConvertersImpl,不知道能不能做java Future和scala Future之间的转换?

  在scala线程的执行过程内部嵌套了HBase异步客户端线程、使用了SI所以基于sprinBoot打包应用,都没有什么问题。现代java应用因为依赖很多,往往会有意无意带了一堆线程池,有的你知道有的你不知道。这里我希望第一个接收任务的线程池不要被阻塞,所以所有的阻塞任务都放到了2和3,2需要做一些磁盘IO、3需要和HBase远程交互也就是网络IO,测试也证实这种IO操作都是耗时大户,有意无意用了三个不同的线程池,自然隔离出去这些阻塞操作,实现了舱壁模式,测试当中1肯定不会被阻塞,只需要一两个线程就可以保持保活的接收任务服务。

  在scala Future的执行过程中,有一些额外的中间结果希望委托给别的Future去单独处理,也不需要处理结果也不对处理负责,相当于从一个Future A  Fork出另一个Future B去处理一些额外的中间结果,看了下scala Future的自带方法没有合适的,这种情况可以直接撸一个Future B分支去处理,Future A仍然自己运行自己的,没什么问题,像这样:

Future {

  Future B的任务...

}.recover[Boolean] {

  case t:Throwable =>println("A Throwable has occured!: " + t.getCause);false

  case e:Exception =>println("An Exception has occured!: " + e.getCause);false

  case er:Error =>println("An Error has occured!: " + er.getCause);false

}.onComplete {

  case Success(ok) =>if(ok) println("is Done") else println("So what?")

  case Failure(ko) =>println("Never "+ko)

}

  类似的,CompletableFuture也有一个静态方法runAsync可以直接去实现一个Fork & Forget分支,方便你直接开始一段异步任务,但也是返回的CompletableFuture<Void>,不够完善,相比之下Scala  Future的onComplete、recover给你提供了完备的异常处理。上述Future 分支在传统java里相当于:

new Thread(

    new Runnable(){

        .......

    }

).start

上一篇下一篇

猜你喜欢

热点阅读