平行流、字符串、实用操作符

2018-09-29  本文已影响51人  CyrusChan

介绍:

2.0.5版本引入的ParallelFlowable API 允许并行执行一系列选择的操作符,例如 map,filter,concatMap,flatMap,collect,reduce 等等。 注意:对于Flowable(一个特定的子域语言)这是一个并行的模块而不是新的响应基础类型。

因此,几个典型的操作符例如take,skip 和许多其他的是不可用的且这儿没有ParallelObservable ,因为 像期望那样内部的并行操作符队列不发生洪泛,背压是至关重要的,我们希望并行是因为单线程处理数据是缓慢的。

最简单的方式进入并行世界是使用 Flowable.parallel:

ParallelFlowable<Integer> source = Flowable.range(1, 1000).parallel();

默认情况下,并行级别被设置为可用的cpu数量。

(Runtime.getRuntime().availableProcessors()) 和 从原序列预取的数量 被设置到Flowable.bufferSize() (128).两者都可以通过parallel()的重载函数来指定。

ParallelFlowable 遵循Flowable同样的异步参数原则,因此, parallel() 本身不引入源序列的异步消费,但仅准备并行流,异步通过runOn(Scheduler) 操作符被定义。

ParallelFlowable<Integer> psource = source.runOn(Schedulers.io());

并行等级(ParallelFlowable.parallelism()) 不必匹配Scheduler的并行等级。runOn操作符将使用和并行源定义的一样多的Scheduler.Worker实例。 这允许ParallelFlowable 为CPU密集的任务工作通过Schedulers.computation(),阻塞IO 通过Schedulers.io() 绑定任务且通过TestScheduler单元测试。你也可以指定预取数量在runOn上。

一旦必要的平行操作已经被应用,你可以通过ParallelFlowable.sequential()操作符返回连续的Flowable

Flowable<Integer> result = psource.filter(v -> v % 3 == 0).map(v -> v * v).sequential();

注意: sequential 不保证在并行运算符之间流动的值之间的顺序。

String Observables

StringObservable 类包含一些函数,这些函数代表一些操作符,特别是处理基于字符串的序列和流的Obseravable.这些包括:

Transforming Observables

这部分说明你可以转换被Observable发射的item的操作符

Observable Utility Operators

这部分列出了不同的为Observable工作的实用操作符。

Plugins

插件允许你从几个方面改变Rxjava默认的行为

通过改变一系列默认的计算,i/o 和新线程调度器

通过注册RxJava可能遇到的特别错误的处理器

通过注册能够注意到几个常规RxJava活动的发生的函数

RxJavaHooks

新的RxJavaHooks允许你连接到 Observable、Single、Completable类型及被Schedulers返回的Scheduler们 的生命周期 且为不可交付的问题提供一个全方位的解决方案。

你现在可以在运行期改变这些hook 且不必再通过系统参数准备hook。因此用户可能仍要依赖于旧的hook系统,RxjavaHooks 默认委托给旧的Hook。

RxJavaHook 有不同种类的hook的setter和getter

image.png

读取和改变这些hook是线程安全的

你也可以通过clear()清除所有的hook 或者 通过reset()重置到默认的行为(委托给旧的RxJavaPlugin系统)

例子:

RxJavaHooks.setOnObservableCreate(o -> { 
    System.out.println("Creating " + o.getClass());
    return o; 
});
try {
    Observable.range(1, 10)
    .map(v -> v * 2)
    .filter(v -> v % 4 == 0)
    .subscribe(System.out::println);
} finally {
    RxJavaHooks.reset();
}

此外,RxJavaHook 提供所谓的装配跟踪特性.这嵌入一个自定义Observable,Single,Completable到他们捕捉目前栈迹的链中 当这些操作符被实例化的时候(装配时间)。无论何时 一个错误信号通过onError被发出,这些附加到装配时间的栈轨迹的中间件最终造成这个异常。这可能帮助定位代码库的问题序列。

Example:

RxJavaHooks.enableAssemblyTracking();
try {
    Observable.empty().single()
    .subscribe(System.out::println, Throwable::printStackTrace);
} finally {
   RxJavaHooks.resetAssemblyTracking();
}

这将打印像下面的结果:

java.lang.NoSuchElementException
at rx.internal.operators.OnSubscribeSingle(OnSubscribeSingle.java:57)
...
Assembly trace:
at com.example.TrackingExample(TrackingExample:10)

栈轨迹字符串在支持debug和发现运行链中不同操作符的状态也是可用的。

栈轨迹通过移除不相关的入口例如线程入口,单元测试和跟踪系统入口本身 来被过滤去减少噪音

RxJavaSchedulersHook

Deprecated

这个插件允许你去重载默认的计算,i/o 和新线程Scheduler .集成 类RxJavaSchedulersHook 和重写如下方法 :

接着按如下步骤:

  1. 创建一个新的你已经实现的RxJavaDefaultSchedulers 子类的对象

  2. 通过RxJavaPlugins.getInstance( )获取全局的RxJavaPlugins 实例

  3. 传入默认的scheduler对象到该实例的registerSchedulersHook( )函数

当你完成了这些,RxJava 开始使用你的函数返回的Scheduler而不是它内嵌的默认的

RxJavaErrorHandler

Deprecated

该插件允许你注册一个将会处理错误的函数,该错误将被传到SafeSubscriber.onError(Throwable)。(SafeSubscriber 用于封装即将到来的Subscriber当subscribe()被调用)。为此,继承类RxJavaErrorHandler``且 重写该函数:

接着按如下步骤:

  1. 创建一个你实现的RxJavaErrorHandler 子类的新的对象

  2. 通过RxJavaPlugins.getInstance( )获取全局的RxJavaPlugins 实例

  3. 传入错误处理器到该实例的registerErrorHandler( )函数

当你完成了这些,RxJava将开始使用你的错误处理器来处理传给SafeSubscriber.onError(Throwable)的错误。

例如:

RxJavaPlugins.getInstance().reset();

RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
    @Override
    public void handleError(Throwable e) {
        e.printStackTrace();
    }
});

Observable.error(new IOException())
.subscribe(System.out::println, e -> { });

然而,该调用和操作符链一般情况下将不会在每个阶段被触发。

Observable.error(new IOException())
.map(v -> "" + v)
.unsafeSubscribe(System.out::println, e -> { });

RxJavaObservableExecutionHook

Deprecated

该插件允许你注册RxJava将调用某些常规的RxJava活动的函数,例如:日志或者metrics-collection purposes。为此,继承RxJavaObservableExecutionHook 类 且重写这些方法中的任何一个或全部

image.png

接着按如下步骤:

  1. 创建一个新的你实现的RxJavaObservableExecutionHook 子类的实例。

  2. 通过RxJavaPlugins.getInstance( )或者全局的RxJavaPlugins 实例

  3. 传入你的执行hook对象到该实例的registerObservableExecutionHook( )函数中。

当你完成了这些,RxJava将会调用你的函数当遇到特定的被设计需要注意的条件

上一篇下一篇

猜你喜欢

热点阅读