Reactive Streams
Reactive Streams + RxJava
为了标准化JVM上的异步流协议,响应流已经是共同努力的成果,. RxJava团队从一开始就参与了这项工作,并支持使用响应流api,并最终支持Java 9流api,这些结果与团队的努力息息相关。
这与RxJava本身有什么关系?
RxJava1.x
目前,Rxjava 1.x 不直接实现 响应流的接口。这是由于Rxjava 1.x 已经存在这些api且不能破坏公共的api。 然而它确实在语义上遵守非阻塞近似于背压和流控制的“响应式牵引”。且因此可以用作两种类型之间的桥梁。为了响应流实现之间的交互协作和通过响应流TCK兼容性测试,Rxjava响应流模型在RxJava1.x 类型和 响应流类型建立了一个桥梁。它的API像下面这样:
package rx;
import org.reactivestreams.Publisher;
public abstract class RxReactiveStreams {
public static <T> Publisher<T> toPublisher(Observable<T> observable) { … }
public static <T> Observable<T> toObservable(Publisher<T> publisher) { … }
}
RxJava 2.x
RxJava2.x 将直接针对Java 8+ 的响应流API。这个计划通过利用新的多版本Java jar包来支持Java9 j.u.c.Flow 类型
由于这里有一个接口去扩展,Rxjava2 将真正是“响应式扩展”。RxJava 1.x 没有一个基础接口 或者 协议来扩展 因此 必须通过 口头来定义它。RxJava2 希望 基于一种 高性能,经久沙场的,轻量级(响应流单一依赖),非固执的响应流的实现 且 j.u.cFlow 提供一个带参数化并发的高阶函数的库。
公共库的API
响应流的一个巨大的价值是它的库公开的公共API。当解耦具体的实现时,如下是一些指引和建议关于如何使用响应流和RxJava 来创建响应库。
公开响应式API的优点而不是Rxjava的。
轻量级:接口上没有任何具体的实现,且非常轻量级的依赖。这能让依赖图和字节位很小。
前瞻性:由于响应式API如此简单且正成为JDK9中的一部分。在公开异步访问数据方面是具有前瞻性的。JDK9 的 j.u.c.Flow 的API 匹配 响应流的API ,所以任何实现 响应流的Publisher 将也能够实现 Flow.Publisher 类型。
交互性:一个以响应流类型公开的API 能轻易的被任何实现消费,例如RxJava,Akka,Reactor.
公开响应流API的缺点而不是Rxjava的
一个响应流 Publisher本身不是非常有用,如果没有像flatMap那样的高阶函数,那么 它仅仅是一个更好的回调。这意味着 消费Publsher 几乎将总是需要去转化或者封装到一个响应流的实现当中。总是将具体的实现包装到Publisher中是啰嗦和笨拙的。如果JVM支持扩展函数将会变的优雅,因为它不是具体的和啰嗦的。
具体来说,响应流和 Flow Publisher 接口不提供任何操作符的实现,例如 flatmap,merge,filter,take,zip 和 许多其他的 用于组合 和转换异步流的操作符 。 一个Publisher 仅仅用于被订阅。
响应流规范和二进制构件不提供具体的Publisher实现。一般而言,一个库需要被RxJava 或者 Akka 等流提供能力为了它的内部使用或者仅仅提供一个可用的支持背压语义(这对正确实现是意义非凡的)的Publisher。
推荐的方法:
既然 响应流 已经实现了1.0 我们建议使用它作为核心api,这些api是用于交互的。这将会允许采用不带强依赖的异步流语义 在任何单一的实现上。这也意味着这些API的消费者能够选用RxJava1.x ,RxJava2.x Akka流,Reactor 或者最合适这些api的其他流组合库。 它也有更好的前瞻性,例如 从RxJava 1.x 移到 2.x 的例子。
然而,为了避免以上所列的缺点,我们也建议让消费它们变的容易,即开发者不需要明确的封装他们选择的组合库的API. 因此我们建议在最核心的api上为流行的响应流实现提供封装的模块,否则你的消费者将需要自己做这些事情。
注意,如果java提供了扩展函数,那么这个方法不是必要的,但是,在Java提供这一点之前(如果不是很快的话),以下是一种利用优点和弥补缺点的方法。
例如:一个数据库驱动可能有如下一些模块:
//公开响应流 Publisher APIs的核心库
- async-database-driver
// 封装具体实现的集成jar包
- async-database-driver-rxjava1
- async-database-driver-rxjava2
- async-database-driver-akka-stream
核心库可能公开像下面这样的API:
package com.database.driver;
public class Database {
public org.reactivestreams.Publisher getValue(String key);
}
RxJava 1.x包装器可以是一个单独的模块,它提供像这样的RxJava特定api
package com.database.driver.rxjava1;
public class Database {
public rx.Observable getValue(String key);
}
核心Publisher API能被封装的像这样简单:
public rx.Observable getValue(String key) {
return RxReactiveStreams.toObservable(coreDatabase.getValue(key));
}
RxJava2.x 封装将与这不同(一单2.x是可用的)
package com.database.driver.rxjava2;
public class Database {
public io.reactivex.Observable getValue(String key);
}
Akka 流封装将变成这样:
package com.database.driver.akkastream;
public class Database {
public akka.stream.javadsl.Source getValue(String key);
}
开发者可以选择直接依赖 async-database-driver 的API ,但是大多数将使用其中一个包装器,这个封装器支持他们选择的组合库