平行流、字符串、实用操作符
介绍:
2.0.5版本引入的ParallelFlowable API 允许并行执行一系列选择的操作符,例如 map,filter,concatMap,flatMap,collect,reduce 等等。 注意:对于Flowable(一个特定的子域语言)这是一个并行的模块而不是新的响应基础类型。
因此,几个典型的操作符例如take,skip 和许多其他的是不可用的且这儿没有ParallelObservable
,因为 像期望那样内部的并行操作符队列不发生洪泛,背压是至关重要的,我们希望并行是因为单线程处理数据是缓慢的。
最简单的方式进入并行世界是使用 Flowable.parallel
:
ParallelFlowable<Integer> source = Flowable.range(1, 1000).parallel();
默认情况下,并行级别被设置为可用的cpu数量。
(Runtime.getRuntime().availableProcessors()
) 和 从原序列预取的数量 被设置到Flowable.bufferSize()
(128).两者都可以通过parallel()的重载函数来指定。
ParallelFlowable 遵循Flowable同样的异步参数原则,因此, parallel() 本身不引入源序列的异步消费,但仅准备并行流,异步通过runOn(Scheduler) 操作符被定义。
ParallelFlowable<Integer> psource = source.runOn(Schedulers.io());
并行等级(ParallelFlowable.parallelism()
) 不必匹配Scheduler的并行等级。runOn操作符将使用和并行源定义的一样多的Scheduler.Worker实例。 这允许ParallelFlowable
为CPU密集的任务工作通过Schedulers.computation(),阻塞IO 通过Schedulers.io() 绑定任务且通过TestScheduler单元测试。你也可以指定预取数量在runOn上。
一旦必要的平行操作已经被应用,你可以通过ParallelFlowable.sequential()操作符返回连续的Flowable
Flowable<Integer> result = psource.filter(v -> v % 3 == 0).map(v -> v * v).sequential();
注意: sequential
不保证在并行运算符之间流动的值之间的顺序。
String Observables
StringObservable 类包含一些函数,这些函数代表一些操作符,特别是处理基于字符串的序列和流的Obseravable.这些包括:
- byLine( )— 通过把源序列处理为一个流且以换行符为分割来把一个Observable的字符串转换成行的Observable
- decode( )— 把一个多字节字符转换成一个发射遵守字符边界的字节数组的Observable
- encode( )— 把一个发射字符串的Observable转换成一个遵守源字符串中多字节字符的字符边界的字节数组的Observable.
- from( )— 转换一个字符流或者一个Reader 为一个发射字节数组或者字符串的Observable.
- join( )— 把一个发射字符串序列的Observable转换成一个发射一个单一字符串,该字符串是由那些字符串序列拼接而成。
- split( )— 把一个Observable的字符串转换成一个Observable,这个Observable把源序列当做一个流,且该流是以特定正则边界分割开
- stringConcat( )— 把一个发射一系列字符串的Observable转换成一个发射由上述字符串序列拼接成的单一字符串的Observable
Transforming Observables
这部分说明你可以转换被Observable发射的item的操作符
- map( )— 通过对每一个被Observable发射的item应用一个函数来转换他们
- flatMap( )****, ****concatMap( )****, and ****flatMapIterable( ) —把被Observable发射的多个item转换成多个Observable或多个Iterable,接着压成一个单一的Observable.
- switchMap( )— 把被Observable发射的多个item转换成多个Observable,并且镜像那些最近被转换的Observable发射的item
- scan( )— 对每一个被Observable发射的item值应用一个函数,且发射后续的每一个值。
- groupBy( )— 把一个Observable划分成一系列通过key组织的Observable,这些Observable从原始的Observable发射成群的item。
- buffer( )— 周期性的从一个Observable聚集item成一个bundle,且发射这些bundle而不是一次发射一个item。
- window( )— 周期性的从一个Obseravble再细分item成一个Observable window 且发射这些Window而不是一次发射一个item.
- cast( )— 在再次发射他们之前,把来自于源Observable的所有item转换成特殊的类型。
Observable Utility Operators
这部分列出了不同的为Observable工作的实用操作符。
- materialize( )— 把一个Observable转换一系列通知。
- dematerialize( )— 把一个物化的Obsrevable回退到非物化的形式
- timestamp( )— 为每一个被Observable发出的item附上时间戳
- serialize( )— 强制一个Observable执行序列化调用
- cache( )— 记住被Observable发射的item序列且为未来的订阅者发射相同的序列。
- observeOn( )— 指定Subscriber应该在哪个Scheduler观察Observable.
- subscribeOn( )— 当一个订阅被执行的时候,指定一个Observable应该使用哪个Scheduler。
- doOnEach( )— 无论何时Observable发射一个item,注册一个action.去执行
- doOnNext( )— 就在Observable传入onNext事件顺流而下之前,注册一个action去执行
- doAfterNext( ) —就在Observable传入onNext事件顺流而下之后,注册一个action去执行
- doOnCompleted( )— 当一个Observable成功完成,注册一个action去执行
- doOnError( )— 当一个Observable带错误完成,注册一个action去执行
- doOnTerminate( )— 在一个Observable结束之前,不管成功或是出错,注册一个action去调用。
- doAfterTerminate( )— 在一个Observable结束之后,不管成功或是出错,注册一个action去调用。
- doOnSubscribe( )— 当一个Observabler订阅一个Observable,注册一个action去调用。
- 1.xdoOnUnsubscribe( ) — 当一个Observable取消订阅一个Observable,注册一个action去调用
- finallyDo( )— 当一个Observable完成,注册一个action去执行。
- doFinally( )— 当一个Observable结束或者被处理,注册一个action去执行
- delay( )— 从一个Obseravble按一定数量向将来转移一些发射对象
- delaySubscription( )— 持有一个Subscriber的订阅请求一段指定的时间在传入它到源Observable之前。
- timeInterval( )— 在源Observable的两个连续的发射之间暂停一段时间
- using( )— 创建一个与Observable相同生命周期的可支配的资源
- single( )— 如果Observable再发射一个item后完成了,那么返回这个item,否则抛出一个异常
- singleOrDefault( ) —如果Observable再发射一个item后完成了,那么返回这个item,否则返回默认item
- repeat( )— 创建一个重复发射特殊item或者item序列的 Observable
- repeatWhen( ) —创建一个重复发射特殊item或者item序列的 Observable,取决于第二个Observable的发射
Plugins
插件允许你从几个方面改变Rxjava默认的行为
通过改变一系列默认的计算,i/o 和新线程调度器
通过注册RxJava可能遇到的特别错误的处理器
通过注册能够注意到几个常规RxJava活动的发生的函数
RxJavaHooks
新的RxJavaHooks允许你连接到 Observable、Single、Completable类型及被Schedulers返回的Scheduler们 的生命周期 且为不可交付的问题提供一个全方位的解决方案。
你现在可以在运行期改变这些hook 且不必再通过系统参数准备hook。因此用户可能仍要依赖于旧的hook系统,RxjavaHooks 默认委托给旧的Hook。
RxJavaHook 有不同种类的hook的setter和getter
image.png读取和改变这些hook是线程安全的
你也可以通过clear()清除所有的hook 或者 通过reset()重置到默认的行为(委托给旧的RxJavaPlugin系统)
例子:
RxJavaHooks.setOnObservableCreate(o -> {
System.out.println("Creating " + o.getClass());
return o;
});
try {
Observable.range(1, 10)
.map(v -> v * 2)
.filter(v -> v % 4 == 0)
.subscribe(System.out::println);
} finally {
RxJavaHooks.reset();
}
此外,RxJavaHook 提供所谓的装配跟踪特性.这嵌入一个自定义Observable,Single,Completable到他们捕捉目前栈迹的链中 当这些操作符被实例化的时候(装配时间)。无论何时 一个错误信号通过onError被发出,这些附加到装配时间的栈轨迹的中间件最终造成这个异常。这可能帮助定位代码库的问题序列。
Example:
RxJavaHooks.enableAssemblyTracking();
try {
Observable.empty().single()
.subscribe(System.out::println, Throwable::printStackTrace);
} finally {
RxJavaHooks.resetAssemblyTracking();
}
这将打印像下面的结果:
java.lang.NoSuchElementException
at rx.internal.operators.OnSubscribeSingle(OnSubscribeSingle.java:57)
...
Assembly trace:
at com.example.TrackingExample(TrackingExample:10)
栈轨迹字符串在支持debug和发现运行链中不同操作符的状态也是可用的。
栈轨迹通过移除不相关的入口例如线程入口,单元测试和跟踪系统入口本身 来被过滤去减少噪音
RxJavaSchedulersHook
Deprecated
这个插件允许你去重载默认的计算,i/o 和新线程Scheduler .集成 类RxJavaSchedulersHook 和重写如下方法 :
- Scheduler getComputationScheduler( )
- Scheduler getIOScheduler( )
- Scheduler getNewThreadScheduler( )
- Action0 onSchedule(action)
接着按如下步骤:
-
创建一个新的你已经实现的
RxJavaDefaultSchedulers
子类的对象 -
通过RxJavaPlugins.getInstance( )获取全局的
RxJavaPlugins
实例 -
传入默认的scheduler对象到该实例的registerSchedulersHook( )函数
当你完成了这些,RxJava 开始使用你的函数返回的Scheduler而不是它内嵌的默认的
RxJavaErrorHandler
Deprecated
该插件允许你注册一个将会处理错误的函数,该错误将被传到SafeSubscriber.onError(Throwable)。(SafeSubscriber
用于封装即将到来的Subscriber当subscribe()被调用)。为此,继承类RxJavaErrorHandler``且
重写该函数:
- void handleError(Throwable e)
接着按如下步骤:
-
创建一个你实现的
RxJavaErrorHandler
子类的新的对象 -
通过RxJavaPlugins.getInstance( )获取全局的
RxJavaPlugins
实例 -
传入错误处理器到该实例的registerErrorHandler( )函数
当你完成了这些,RxJava将开始使用你的错误处理器来处理传给SafeSubscriber.onError(Throwable)的错误。
例如:
RxJavaPlugins.getInstance().reset();
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
@Override
public void handleError(Throwable e) {
e.printStackTrace();
}
});
Observable.error(new IOException())
.subscribe(System.out::println, e -> { });
然而,该调用和操作符链一般情况下将不会在每个阶段被触发。
Observable.error(new IOException())
.map(v -> "" + v)
.unsafeSubscribe(System.out::println, e -> { });
RxJavaObservableExecutionHook
Deprecated
该插件允许你注册RxJava将调用某些常规的RxJava活动的函数,例如:日志或者metrics-collection purposes。为此,继承RxJavaObservableExecutionHook
类 且重写这些方法中的任何一个或全部
接着按如下步骤:
-
创建一个新的你实现的
RxJavaObservableExecutionHook
子类的实例。 -
通过
RxJavaPlugins.getInstance( )
或者全局的RxJavaPlugins
实例 -
传入你的执行hook对象到该实例的registerObservableExecutionHook( )函数中。
当你完成了这些,RxJava将会调用你的函数当遇到特定的被设计需要注意的条件