从源码分析RxJava订阅过程
都知道观察模式吧?
在开始之前让我们简单了解一下观察模式,就是某对象A的变化引起其他多个对象B变化,但是前提是你需要去订阅我,打个比方:就是我的状态发生了改变,那我怎么通知你呢?所以我需要知道的如何去通知其他对象说我这里已经改变了,你看看那需不需要做出改变。就比如微信的订阅号,如果你不订阅,那该订阅号在发布内容也不会通知,这里的订阅号就是被观察者,而用户就是观察者。那怎么说让这两者关联来呢?前面说的订阅号是要提供一个接口,允许用户去订阅的,所以最后就是被观察者和观察者两个都得提供接口,订阅号提供的接口让用户去订阅类比微信号,当订阅号发布内容,就通过这个微信号通知观察者,所以订阅就是这两者的关联点。
开始之前的两个重要的类或接口:Observable
和 Observer
- Observable 它实现ObservableSource接口,通俗来讲Observable就是一个被观察者也有人叫可观察的资源,这里就叫被观察者;
-
Observer 观察者;
涉及的类:
RxJava2.png
订阅流程分析
开始RxJava的订阅流程分析之前,来个简单的栗子,代码如下:
Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("发射 subscribe");
emitter.onComplete();
}
});//ObservableCreate
Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io());//1
observableSubscribeOn.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
showLog("onSubscribe");
}
@Override
public void onNext(String s) {
showLog("onNext");
}
@Override
public void onError(Throwable e) {
showLog("onError");
}
@Override
public void onComplete() {
showLog("onComplete");
}
});
日志结果:
onSubscribe ,Thread: main
onNext ,Thread: RxNewThreadScheduler-3
onComplete ,Thread: RxNewThreadScheduler-3
如上代码,之所以分开来写是为了更清晰的去理解每一步RxJava生成的相关类。
如果你认真看前面的内容,你一下就明白Observable.subscribe()方法也就是订阅的意思,是 Observable
和 Observer
的关联点,也就是被观察者和观察者的关联点,所以我们的分析就从Observable.subscribe(Observer observer)方法开始代码如下:
public final void subscribe(Observer<? super T> observer) {
try {
// .....此处省略几亿代码....
//此方法在Observable类是中是抽象的,注定是子类实现
subscribeActual(observer);
// .....此处省略几亿代码....
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
// .....此处省略几亿代码....
}
}
- 上面代码不难理解在subscribe方法中直接就调用了subscribeActual(observer)方法,我可以翻译为
实际订阅
; - subscribe方法是
Observable
类的方法,他是抽象类,传入了一个 Observer 对象,开始的时候栗子我们可以知道Observable是通过我们调用Observable.create(ObservableOnSubscribe) 所创建出来的; - 那subscribeActual在Observable中是抽象方法,肯定是子类去实现了该方法,从第二点知道子类肯定是在Observable.create(ObservableOnSubscribe)中给new出来的,那么接下我们看看Observable.create(ObservableOnSubscribe)方法的实现;
// Observable.create(ObservableOnSubscribe)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// .....此处省略几亿代码....
// 直接就创建了ObservableCreate,并把source作为参数传进去
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
// onAssembly
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
我们从上面代码我们知道在Observable.create(ObservableOnSubscribe)
中直接就创建了ObservableCreate,而ObservableCreate是Observable的子类,并把source作为参数传进去,最后调用RxJavaPlugins.onAssembly方法,我们默认返回ObservableCreate实例,所以Observable.create方法最后返回的是ObservableCreate实例,所以就验证了上面的第三点实际调用的是ObservableCreate.subscribeActual(observer)方法,这是在不考虑其他变换和线程切换的情况,那我们就来看看ObservableCreate.subscribeActual(observer)方法的实现,代码如下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
//事件发射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//直接回调Observer的onSubscribe方法,这个方法是和线程切换无关,只在当前的线程中执行
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
代码不多,也很好理解:
- 首先调用observer.onSubscribe(parent)方法通知Observer已经订阅成功了。
- 最后调用source.subscribe(parent)方法完成订阅,source又是什么呢?我们知道在ObservableCreate是在Observable.create方法时创建的,并把ObservableOnSubscribe传进来,所以source就是ObservableOnSubscribe,直接回调ObservableOnSubscribe.subscribe方法并把CreateEmitter作为参数传递进去,之后再我们是栗子中通过这个对象调研onNext方法或者onComplete方法发射事件;
看一下CreateEmitter的实现,代码如下:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
//通过构造方法注入 观察者实例
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
// .....此处省略几亿代码....
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
// .....此处省略几亿代码....
}
为了简介清晰我删掉很多无关代码,只保留onNext等这些相关的方法。
- 其实CreateEmitter是Observable的静态内部类,
- 在上面我们知道Observable.subscribeActual方法中创建了CreateEmitter实例并将Observer作为参数通过构造方法注入Observer实例,作为CreateEmitter的成员变量;
- 之后在subscribeActual方法中调用ObservableOnSubscribe.subscribe的方法并把CreateEmitter实例作为方法参数传递进去;
- 简单来说CreateEmitter的作用就是发射事件,里面分装了Observer实例,发射事件就回调到Obsever中的方法,如onNext等方法;
有没有发现从一开始我们就仅仅讲了从Obsevable的创建到订阅,这是比较汉理解的,如果我增加一个map或线程切换呢?这里暂时不展开讲线程切换。
重新把栗子的代码在贴一遍:
Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("发射 subscribe");
emitter.onComplete();
}
});//ObservableCreate
Observable observableSubscribeOn = observableSubscribeOn.map(new Function<String, Object>() {
@Override
public Object apply(String s) throws Exception {
Log.e("tag", "map");
return "aa";
}
})
observableSubscribeOn.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
showLog("onSubscribe");
}
@Override
public void onNext(String s) {
showLog("onNext");
}
@Override
public void onError(Throwable e) {
showLog("onError");
}
@Override
public void onComplete() {
showLog("onComplete");
}
});
如上代码,订阅流程会和之前的有什么不一样呢?那么我们看个究竟,就从 Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io())开始,代码如下:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper){
ObjectHelper.requireNonNull(mapper, "mapper is null");
//这里把上游this传进去也就是source,以便调用上游的subscribe方法
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
从上面代码看,我们知道在map方法中创建了ObservableMap并把上游的Observable参进去了,而我们知道从Observable.subscribe方法开始订阅就会调用 subscribeActual(observer)方法,所以在Observable.subscribe之后就会调用ObservableMap的subscribbeActual方法,代码如下:
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
在ObservableMap的subscribbeActual方法中,直接调用传进来的Observable的subscribe方法又间接调用subscribbeActual方法没所以,订阅的过程实际上是一样的。
总结
- Observable是由上游往下游传递的,并且每个操作符都会创建新的Observable对象包裹上游的实例;
- Observer是由下游往上游传递的,也就是从Observable.subscribe方法开始。