RxJava进阶那些事
前言
rxjava很多人都用过,但是你真的对它足够了解吗?不妨来看看文中的这些问题你能否全部答对。阅读本文,你可以有如下收获:
1、熟悉RxJava主要操作符的应用和使用场景;
2、对RxJava原理的核心逻辑有清楚的解释,给看不懂源码的你一个更清晰的思路;
3、RxJava的原理、设计思想和最大卖点是什么?
目前很多人都在用【retrofit+okhttp+rxjava】做网络请求,这似乎成了Android开发的标配了。但是你真的熟悉/了解这几个工具吗?先不用着急回答,我先抛出几个问题,如果你都掌握了,那本文大多数内容,你已经不用看了;如果你觉得自己回答的不是很好,那本文应该可以让你在网络请求的实战和思维上有所收获。这里以RxJava为例:
- Q1:你用过RxJava的哪些功能?除了如下网络请求异步操作,还有别的吗?(提示:操作符)
public interface ApiService {
@GET("/")
Observable<Login> getData();
}
ApiService apiService = retrofit.create(ApiService.class);
Observable<Login> observable = apiService.getData();
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Login>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onNext(Login login) {
Log.e(TAG, "onNext: ");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.toString());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
- Q2:你项目中是如何处理嵌套(连续)的网络请求的?比如说登陆成功了,再根据uid去获取用户相关信息。
- Q3:你是如何处理多请求,单输出的?也就是说获取多个接口数据,然后合并结果做下一步数据处理
- Q4:rxJava的线程调度器是如何实现线程切换的?
- Q5:rxJava的原理是什么?与其他的异步通信机制Handler、AsyncTask有什么区别?
关于rxjava,你最少且必要知道这些
1、操作符分类与详解
2、优点,使用场景
3、线程调度器
4、原理和设计模式
关于1,2就不在这里细说了,推荐几篇非常优秀的系列文章。
备注:上面的Q1~Q3的答案也在这里面
RxJava2操作符详解
RxJava2实战应用解析
RxJava2线程切换原理(上)
RxJava2线程切换原理(下)
使用场景:
- 单请求异步处理
- 嵌套请求(flatmap)
- 多请求合并处理(多输入,单输出)
- 定时轮训
- 错误重连
线程调度器
注意:这部分,我希望你是看过源码之后再来阅读,不然你会看懵的,因为这不是详细的源码解读,而是给那些看不懂源码,或者说看了之后很懵、找不到关键点的同学看的。
这里涉及2个方法
切换上游线程
.subscribeOn(Schedulers.io())
切换下游线程
.observeOn(AndroidSchedulers.mainThread())
先抛出几个问题
- 线程切换是如何实现的?(提示:Thread,Handler)
-
subscribeOn
和observeOn
顺序变换有影响吗?observeOn
放在其他操作符比如flatmap
之前,线程有什么变化?subscribeOn
在整个链式调用不同的位置有什么不同吗?(这里需要搞清楚)
RxJava2的订阅原理是执行subscribe
时调用Observable
的各个子类的subscribeActual
方法,这个方法最终会调用observer
相关的订阅方法。
这里我是不会通过完整的代码去告诉你上游是如何切换到子线程,下游最后又如何切换到主线程的,我只是想把切换的核心点、主干逻辑告诉你,这样你再去看源码的时候就不会迷茫了。
subscribeOn(Schedulers.io())
这里以常用的写法为例,把上游切换到子线程执行。
注意:这里会吧subscribeOn()
和Schedulers.io()
拆开来讲。
首先subscribeOn()
返回的ObservableSubscribeOn
内部是将订阅操作subscribe
放到一个Runnable
执行
//ObservableSubscribeOn.SubscribeTask
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
//重点在这里
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//上游的Observable和observer的订阅
source.subscribe(parent);
}
}
scheduler.scheduleDirect()
内部也仅仅是对Runnable
的再次包装,并没有创建子线程的操作。
Schedulers.io()
内部会创建线程池,并把上面定义的Runnable
放到子线程操作。Schedulers.io()
返回一个IoScheduler
,在它内部经过一系列调用,最终在NewThreadWorker
创建了线程池,然后通过线程池中的子线程去执行上面订阅的Runnable
,至此上游的线程切换就完成了。
创建线程池
IoScheduler
--->createWorker()
--->EventLoopWorker
--->ThreadWorker
--->NewThreadWorker 构造方法
Runnable任务执行
IoScheduler
--->createWorker()
--->EventLoopWorker
--->schedule()
--->threadWorker.scheduleActual()
---NewThreadWorker.scheduleActual()
observeOn(AndroidSchedulers.mainThread())
切换下游线程
首先observeOn()
最终会调用ObservableObserveOn. subscribeActual
。重点关注scheduler.createWorker()
,这个Worker 对象实际上是在AndroidSchedulers.mainThread()
内部的HandlerScheduler
中生成的,并且持有主线程 handler 的引用。
//ObservableObserveOn.Java
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//重点在这里
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
接着看ObserveOnObserver
,它内部会调用worker.schedule(this);
这里实际上调用的是HandlerWorker.schedule()
,这里面会有一个主线程的Handler对象,然后把特定的线程任务通过handler.sendMessageDelayed()
方法转移到主线中去执行。
至此,关于线程切换的主要逻辑就讲完了。
RxJava2原理总结
-
RxJava2是基于事件流的异步操作库,事件流是通过
subscribe
触发的(实际上是subscribeActual
),每个Observable
子类的subscribeActual
实现逻辑不同。 -
从下往上订阅,事件从上往下发射。
-
我认为RxJava最大的卖点/亮点是事件驱动型编程,可以通过Observable 的操作符对事件在时间和空间维度进行重新组织,而观察者的逻辑几乎不需要修改。