RxJava2 源码总结
2017-12-31 本文已影响26人
SScience
总感觉跟着源码走过程过段时间又会忘记,又得翻一遍源码,所以在第一次学习源码时,把领悟的关键点记录下来,以后回看只要稍微浏览下源码,就能迅速明白它的思想(也许以后还会有更深刻的理解😂)。
本学习源码基于 RxJava 2.1.7
源码地址:https://github.com/ReactiveX/RxJava
一,事件产生和消费--create()
首先,以最简单的使用方式来说:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(">>>>>>>>", "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(">>>>>>>>", "onNext:" + integer);
}
@Override
public void onError(Throwable e) {
Log.e(">>>>>>>>", "onError:" + e.toString());
}
@Override
public void onComplete() {
Log.e(">>>>>>>>", "onComplete");
}
});
// ObservableCreate#subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
...
}
-
Observable
是个抽象类,而Observable.create()
方法返回的对象ObservableCreate
(继承于Observable
),可以看作是事件统一管理者。 -
ObservableCreate
在创建的时候保存了用于事件实际生产者的ObservableOnSubscribe
,在事件订阅Observable#subscribe()
时,传入了事件消费者Observer
。 - 在事件订阅时,
ObservableCreate#subscribeActual()
方法中创建中间者CreateEmitter
,CreateEmitter
把事件消费者Observer
保存起来,然后调用事件生产者ObservableOnSubscribe#subscribe()
方法传入CreateEmitter
(CreateEmitter
是方法subscribe()
的第一个参数的ObservableEmitter
子类)。 - 然后
CreateEmitter
调用自己的onNext()
方法生产事件,在onNext()
方法中把事件传给消费者Observer#onNext()
。 - 由于
CreateEmitter
又实现了Disposable
,在ObservableCreate#subscribeActual()
方法中调用了事件消费者Observer#onSubscribe()
把自己传入消费者,所以在消费者的回调方法中,可以实现终止事件传给消费者dispose()
(因为CreateEmitter#onNext()
等方法中会首先判断isDisposed()
)。
二,操作符--map()
Observable.create(...)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer);
}
})
.subscribe(···);
// ObservableMap#subscribeActual
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
-
map()
方法返回的是新的包装Observable
即ObservableMap<T, U>
(装饰着模式?),ObservableMap<T, U>
中保存了create()
返回了上一节的ObservableCreate
。 - 然后调用
Observable#subscribe()
开始订阅事件,实际是首先调用ObservableMap<T, U>#subscribeActual()
方法,该方法里再调用create()
方法返回的ObservableCreate#subscribeActual()
。 -
ObservableMap<T, U>#subscribeActual()
方法把传入的消费者Observer
包装为MapObserver
,再传给ObservableCreate#subscribeActual()
开始生产事件。 - 在新的
Observer
即MapObserver
中,主要任务是把事件生产者生产的事件通过Function
转化为对应的类型。
三,线程调度--subscribeOn()
Observable.create(...)
.subscribeOn(Schedulers.io())
.subscribe(···);
// ObservableSubscribeOn#subscribeActual
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
-
subscribeOn()
返回的又是新的包装Observable
即ObservableSubscribeOn
(和上一节很像),ObservableSubscribeOn
自然又是保存了create()
返回了的ObservableCreate
。 - 开始订阅事件时,实际是首先调用
ObservableSubscribeOn#subscribeActual()
方法,方法里又包装新的Observer
即SubscribeOnObserver
。 - 通过
scheduler.scheduleDirect(runnable)
在subscribeOn()
指定的线程直接调用run()
方法,方法里再调用ObservableSubscribeOn
保存的最初的Observable
即ObservableCreate#subscribeActual()
开始生产事件,然后返回一个 worker 调度管理者(实现Disposable
接口)。 - 新的包装
Observer
即SubscribeOnObserver
把返回的 worker (子线程)加入dispose()
管理。 - 可以发现,当
subscribeOn()
多次调用时,最终只有ObservableCreate
调用的subscribeOn()
(即第一个)起作用,因为每次subscribeOn()
时新创建的包装Observable
即ObservableSubscribeOn
都会保存上一个Observable
,然后订阅时(在run()
方法调用),会调用上一个Observable.subscribe()
,一直调用到最开始的Observable
即ObservableCreate
才开始切换线程并生产事件。
三,线程调度--observeOn()
Observable.create(...)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(···);
// ObservableObserveOn#subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
...
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
- 也是和上两节类似,新的包装
Observable
即ObservableObserveOn
。 - 创建一个
AndroidSchedulers.mainThread()
对应的 Worker 。 - 创建新的包装
Observer
即ObserveOnObserver
(又实现了Runnable)传给最开始的ObservableCreate
,这样生产的事件会由ObserveOnObserver
对应的onXXX()处理。 - 在
onNext()
里,首先把生产的数据加入队列,然后切换会observeOn()
指定的线程,最后才把数据取出来传给消费者。 - 可以看到,每次切换线程后会立刻发送数据,所以调用
observeOn()
会生效多次,与subscribeOn()
相反。
本人水平有限,如有错误,欢迎批评指出😊