异步编程

异步编程三:reactor模式

2020-01-21  本文已影响0人  青_雉

书接上回,我们一起体验了promise模式,也了解到了其解决什么场景下的问题。
本篇文章的目的之一即回答好两个问题:

另外在概念层面,本篇文章希望能够解释清楚reactor领域一些常见的概念
实践层面,带读者体验一下reactor模式的写法,抛砖引玉

案例实现

还是promise文章里的案例:基于计算器服务,实现一个接口,接口实现计算 a + ((b -c)+ d) -e -f + g
本文实现依然是基于vertx和kotlin来做,只是异步结果编排部分替换为reactor实现:

class ReactorVerticle : AbstractVerticle(){

    lateinit var webClient: WebClient

    override fun start() {
        webClient = WebClient.create(vertx)
        var eventBus = vertx.eventBus()

        eventBus.consumer<JsonObject>("calc.reactor"){ msg ->
            var msgBody = msg.body()
            var a = msgBody.getInteger("a", 0)
            var b = msgBody.getInteger("b", 0)
            var c = msgBody.getInteger("c", 0)
            var d = msgBody.getInteger("d", 0)
            var e = msgBody.getInteger("e", 0)
            var f = msgBody.getInteger("f", 0)
            var g = msgBody.getInteger("g", 0)

            (b asyncSub c)
                .flatMap { it asyncAdd d }
                .flatMap { it asyncAdd a }
                .flatMap { it asyncSub e }
                .flatMap { it asyncSub f }
                .flatMap { it asyncAdd g }
                .doOnError {  msg.fail(500, it.message) }
                .subscribe { msg.reply(it.toString()) }

        }

    }

    infix fun Int.asyncAdd(input : Int) : Mono<Int> {
        return calc(this, input, CalcOperator.add)
    }

    infix fun Int.asyncSub(input : Int) : Mono<Int> {
        return calc(this, input, CalcOperator.sub)
    }

    /**
     * 所有异常必须被处理
     */
    fun calc(a: Int, b: Int, operator: CalcOperator) : Mono<Int> {

        return Mono.create { sink ->
            webClient.get(7777, "pi", "/${operator.name}?a=$a&b=$b")
                .expect(ResponsePredicate.SC_OK).send{
                    if (it.succeeded()) {
                        try{
                            var addResult = it.result().bodyAsString().toInt()
                            sink.success(addResult)
                            println("reactor calc: $a - $b = $addResult")
                        } catch (e: Exception) {
                            sink.error(e)
                        }
                    } else {
                        sink.error(it.cause())
                    }
                }
        }
    }
}

与promise模式实现的代码风格很像,在当前案例这种场景下可以说reactor模式可以做到和promise模式一样的效果。
但是reactor不仅仅解决promise的场景。

案例变种一

实现计算 a + ((b -c)+ d) -e -f + g,当 b-c > 5 时取b-c的结果,否则以6作为b-c的结果
以reactor模式实现代码如下:

(b asyncSub c)
                .filter{ it > 5 }
                .switchIfEmpty(Mono.just(6))
                .flatMap { it asyncAdd d }
                .flatMap { it asyncAdd a }
                .flatMap { it asyncSub e }
                .flatMap { it asyncSub f }
                .flatMap { it asyncAdd g }
                .doOnError {  msg.fail(500, it.message) }
                .subscribe { msg.reply(it.toString()) }

当然以promise模式来做的话,代码优化一下,应该也不会太丑,比如:

(b asyncSub c).thenCompose {
    var promise = CompletableFuture<Int>()
    if(it > 5){
        promise.complete(it)
    } else {
        promise.complete(6)
    }
    promise
}
.thenCompose { it asyncAdd d }
.thenCompose { it asyncAdd a }
.thenCompose { it asyncSub e }
.thenCompose { it asyncSub f }
.thenCompose { it asyncAdd g }
.thenAccept { msg.reply(it.toString()) }
.exceptionally {
    msg.fail(500, it.message)
    null
}

但是上面用到的filter和switchIfEmpty组合只是reactor里的两个操作符号而已,reactor里还有很多很丰富的各种操作符

最初的问题

下面来解答最开始的两个问题

  • reactor模式解决什么场景下的问题
  • reactor解决问题的场景与promise模式有哪些不同,有哪些重叠

reactor可以解决promise场景下的问题,而且解决方案更加优雅,并形成标准;有很多现成的轮子(各种操作符)拿来即用。但reactor不只是为了解决promise面对的问题的,他解决的问题笔者归纳如下(个人观点,欢迎讨论):

说法有点绕,reactor和promise本身是两个东西,promise顶多算一种设计模式,而reactor是一种编程风格;因为本系列文章从异步编程角度谈起,所以故事一路讲来把reactor和promise进行了一轮比较。

下面顺便引用两处权威文档:

asynchronous stream processing with non-blocking back pressure
-- https://www.reactive-streams.org/

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).
-- https://en.wikipedia.org/wiki/Reactive_programming

reactor核心概念串讲

下面用两张图来讲一下reactor模式;第一张图,两个工人,左边的是publisher、右边的是subscriber,中间是一个流水线, 流水线上有四道工序:

reactive stream

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure
-- https://www.reactive-streams.org/

reactor stream 定义了一套 接口project reactor 实现了这套接口并进行了一些拓展

所以基于reactor开发的代码,各种方法返回的Mono或者Flux都是属于publisher,publisher会被调用者subscribe到具体的subscriber,这个过程就是搭建流水线的过程。如果有0到1个货物要处理就用Mono;如果有0到n个货物要处理就用Flux。
这里只大概讲一下概念,更深入的内容建议熟读五遍 projectreactor文档,这个文档里会介绍为什么要用reactor,如何使用,如何选择操作符等各种问题。

reactor模式与http服务

java生态这么多年一路走来,做web服务,最原始大部分基于servlet模型,都是阻塞式io来做,与异步编程扯不上关系。随着时代的发展,对性能要求越来越高,高并发的web服务似乎是java的一个软肋。但基于异步编程,也衍生出了一些解决方案,可以与reactor模式一起玩的,列举常用的三种:

上面的三个每一个都可以单独列一个话题谈好多,暂不展开

缺点

reactor模式虽然看起来狂拽酷炫,漫不经心就把回调地狱给解决了,但是在复杂业务场景下,比如各种...if...else...嵌套的场景,如果想要代码保持清晰性、可读性,是非常考验设计功底和编码功底的,当然对笔者来说这一点亦是异步编程的魅力所在。
对程序员个人来讲,多接触一些思想和模式,有助于开阔思路,融会贯通;
但是对于公司来讲,做项目讲究性价比,采用入门门槛高的编程方式,就需要招能力强的程序员,反之只需要招一些刚刚毕业的实习生即可。
异步编程,我们从回调、promise聊到reactor,有没有其他可选方案呢?

协程,支持以同步的方式写异步的代码

笔者认为这也是最近这几年golang火起来的一个主要原因。
另外jvm体系kotlin也支持协程,与golang的协程在玩法上大不相同。
下一篇文章,我们来一起聊一聊协程。

参考文章

知乎上关于函数式编程的讨论
ReactiveX 文档翻译
projectreactor文档

系列文章快速导航:
异步编程一:异步编程的魅力
异步编程二:promise模式
异步编程三:reactor模式
异步编程四:协程

上一篇 下一篇

猜你喜欢

热点阅读