RxJava2.x (二) 操作符和调度器
上一篇文章介绍了RxJava基础,这一篇介绍一下RxJava2.0中操作符和调度器的使用。
一、操作符
map()操作符
map()操作符,就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。
Observable observable = Observable.just("Hello RxJava2.0")
.map(newFunction() {
@Override
public String apply(@NonNullString s) throws Exception {
return s;
}
});
map()操作符,就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。
flatMap()操作符
flatMap()对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。它可以返回任何它想返回的Observable对象。
filter()操作符
.filter(newPredicate() {
@Override
public boolean test(@NonNullString s)throwsException {
return false;
}
})
filter()操作符,根据自己想过滤的数据加入相应的逻辑判断,返回true则表示数据满足条件,返回false则表示数据需要被过滤。最后过滤出的数据将加入到新的Observable对象中,方便传递给Observer想要的数据形式。
take()操作符
take()操作符:输出指定数量的结果。
doOnNext()
doOnNext()允许我们在每次输出一个元素之前做一些额外的事情。
.doOnNext(newConsumer() {
@Override
public void accept(@NonNullString s) throwsException {
Log.e(TAG,"doOnNext");
}
});
以上就是一些常用的操作符,通过操作符的使用。我们每次调用一次操作符,就进行一次观察者对象的改变,同时将需要传递的数据进行转变,最终Observer对象获得想要的数据。
二、调度器 Scheduler
Scheduler简介
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)
在RxJava 中,Scheduler,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景。
Scheduler 的 API
Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread():总是启用新线程,并在新线程执行操作。
Schedulers.io():I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。
Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
Android 还有一个专用的AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。subscribeOn(): 指定Observable(被观察者)所在的线程,或者叫做事件产生的线程。 observeOn(): 指定 Observer(观察者)所运行在的线程,或者叫做事件消费的线程。
下面用代码展示下线程调度的使用:
Observable.create(newObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
Log.e(TAG,"所在线程:"+Thread.currentThread().getName());
e.onNext("Hello RxJava2.x");
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(newConsumer() {
@Override
public void accept(@NonNullString s)throws Exception {
Log.e(TAG,s);
Log.e(TAG,"所在线程:"+Thread.currentThread().getName());
}
});
输出结果
com.jumei.android.rxjavademo E/RxJava2.x: 所在线程:RxCachedThreadScheduler-1
com.jumei.android.rxjavademo E/RxJava2.x: Hello RxJava2.x
com.jumei.android.rxjavademo E/RxJava2.x: 所在线程:main
由此可以看到Observable(被观察者)发送事件的线程的确改变了, 是在一个叫 RxCachedThreadScheduler-1的线程中发送的事件, 而Observer(观察者)仍然在主线程中接收事件。