RxJava浅析——事件如何从上游传递到下游
之前学过一阵子RxJava1.x,但没应用到项目中。最近在Android上使用一个Stomp协议的库,这里面用到了RxJava2,所以重新把RxJava给捡起来了。用了一阵子之后觉得光知其然而不知其所以然比较别扭,而且作为开发总是对源码充满着好奇。虽然源码可能晦涩难懂,逻辑千回百转,但如果能从中领悟一点架构设计的精妙之处或许就是值得的。所以就开始了这一段读RxJava2源码之路。
不在这里立任何的Flag!!!总之算是有一个开始。
怕事情说不清楚,所以语言会比较啰嗦,请大家见谅。
本文将会分析最简单最基础的RxJava2的源码——如何将事件从Observable(上游)传递到Observer(下游)。
先上代码(为了能看得清楚,这里不使用lambda写法),没错,本文就是解释一下这段代码会如何执行。
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("abc");
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
Log.i(TAG, "onNext " + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError", e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
});
先把几个类罗列一下:
- Observable:上游。事件的源头。
- Observer:下游。事件的处理者。
- ObservableEmitter:事件发送者。
- Disposable:事件切断者。
- ObservableOnSubscribe:一个接口。只是为了将ObservableEmitter返回给上游,以便发送事件。
使用过RxJava2的童鞋应该对Observable
,Observer
,还有这一串链式操作非常熟悉。
先看一下create()
方法干了什么:
//代码位置:io.reactivex.Observable.java
//不是源码,是我简化后的代码。源码还有空检查,Hook方法的回调等。这些不影响整个逻辑,所以可以先忽略。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return new ObservableCreate<T>(source);
}
create()
方法很简单,只给我们返回了一个ObservableCreate
对象。然后把我们传入的ObservableOnSubscribe
对象(source
)传入了ObservableCreate
的构造方法。
ObservableOnSubscribe
是一个接口。我们在create()
方法的入参时给了一个匿名内部类的实现。也就是说这个ObservableOnSubscribe
完全是我们外部给的实现。注意下这个subscribe()
方法的入参是ObservableEmitter
,这个是事件发送器,我们用这个来发送onNext
,onError
,onComplete
事件。
//代码位置:io.reactivex.ObservableOnSubscribe.java
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
接下来就要看ObservableCreate里面干了啥了。
//代码位置:io.reactivex.internal.operators.observable.ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
what??? 仅仅是保存了我们传入的source
。然后就没有然后了。
所以说创建一个Observable的时候其实啥也没有发生,此时我们只有一个上游,没有下游,也没有连接上下游的管道,所以水流不通。我们已经知道下游是Observer了,那么管道是什么呢?
对,就是这个subscribe()
方法。
注意注意,创建上游的时候我们返回的类型其实是ObservableCreate,从签名上可以看到这个类是Observable的子类。因为ObservableCreate没有重写父类的subscribe()
方法,所以我们来看Observable中的subscribe()
方法的实现。
//代码位置:io.reactivex.Observable.java
//已经化简,去掉了一些空检查等代码.
public final void subscribe(Observer<? super T> observer) {
try {
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
}
}
核心代码就一行:subscribeActual(observer);
这个实现,不用说,当然是ObservableCreate给的。
//代码位置:io.reactivex.internal.operators.observable.ObservableCreate.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);//Point 1
observer.onSubscribe(parent); //Point 2
try {
source.subscribe(parent);//Point 3
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这段代码也很简洁。重点就3句话。
先来讲Point 1 :创建了一个CreateEmitter,并把我们的下游(Observer)传入了。
这个CreateEmitter是什么鬼?
上文我们说到ObservableEmitter是事件发送器,然而他只是个接口,那么CreateEmitter是他的某个实现啦。
上代码:
//ObservableCreate的静态内部类。
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable{
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
}
创建CreateEmitter
也只是把我们传入的Observer
保存下来。
接着讲Point 2:回调Observer.subscribe()
方法,并把Point 1中创建的CreateEmitter
传递出去。纳尼???Observer.subscribe()
方法入参可是Disposable
啊。果然这个CreateEmitter
也实现了Disposable
接口。所以说这个CreateEmitter
既是事件发送器也是事件阶段器。这很好理解,最方便快捷的操作就是在事件发送的地方截断事件。
最后Point 3: source.subscribe(parent).
啥??? source
是啥,奥,还记得创建Observable
的时候唯一做的事情就是保存了我们传入的ObservableOnSubscribe
对象么?对,现在他给我们回调了。并且把一个事件发送器传递给了我们。这句话就回调到了我们用匿名内部类去实现的ObservableOnSubscribe
接口。
Observable
.create(new ObservableOnSubscribe<String>() {//对对对,这个就是source
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("abc");//对对对,这个e就是parent,就是内部创建的CreateEmitter
}
})
所以呐,拿着这个事件发送器我们就可以发送事件啦!
上文的代码里只发送了一个事件:e.onNext("abc")
。
最后我们就可以来看调用onNext,onComplete,onError事件分别会发生什么了。
//代码位置:当然还是在CreateEmitter啦。因为调用的就是e.onNext("abc")嘛
@Override
public void onNext(T t) {
if (t == null) { //Point 1
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {//Point 2
observer.onNext(t);//Point 3
}
}
Point 1: 因为RxJava2不允许空对象出现啦,包括之前的代码里也有很多空检查的操作。这个要注意了。
Point 2: 就是先检测一下事件有没有被切断,如果被切断了(isDisposed()
返回true
),当然不能往下传了。
Point 3: 还记得我们创建CreateEmitter
时,把我们外部创建的Observer
传入了吗?对,就是这个observer
。所以发现没,就这么调用到了Observer.onNext()
。
没错,事件就这样从传递过来啦。
可以想象,onError和onComplete事件也是类似的。
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t); //point 1
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);// point 2
} finally {
dispose(); //point 3
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();//point 4
} finally {
dispose();//point 5
}
}
}
onError比较特殊,就是如果事件流已经被切断,再次发送onError,会触发point 1处的代码(具体不用挂心,反正就是会抛一个异常,UndeliverableException)。当然onError()
方法传递的Throwable
也不能为空!!!
Point 2 和 Point 4分别是回调外部Observer的onError
和onComplete
方法。这个回调跟onNext
是一毛一样的。
从Point 3和Point 5来看,一旦发生onError或者onComplete事件,RxJava内部就会把事件流切断。切断的机制我们暂时不讲。我们知道CreateEmitter也是一个事件切断器,所以他当然会有dispose()
方法去切断事件,isDispose()
方法判断事件有没有被切断。
OK! 本文要讲的差不多了。最后附上一张类图,帮助理解和回忆。
RxJava2.pngps:之所以我们有时候搞不清楚内部实现的逻辑,是因为对外暴露的永远都是接口,接口,接口,而内部的实现可能有很多其他角色在表演。