RxJava 2 源码解析之创建-订阅-变换-发布
本文源码基于2.1.8版本。
一段非常典型RxJava
使用流程:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
}
})
.subscribeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer);
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d("RxJava2", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
那么它内部是怎么执行起来的呢?
Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
RxJavaPlugins.onAssembly()在这里仅仅是返回了参数中的Observable而已,重点关注ObservableCreate:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// ... 省略
}
它的构造方法仅仅是保存原始的ObservableOnSubscribe。那么,最终还是看看ObservableOnSubscribe:
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
它只是一个接口,而且RxJava没有它的默认实现,我们全都需要自己实现。
先跳过subscribeOn和map方法,直接看subscribe():
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
这里代码虽然很多,但是核心代码就是
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
而已。
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
这里基本也跟之前一样,没干什么事,直接返回原来的Observer。那么subscribeActual呢?
/**
* Operator implementations (both source and intermediate) should implement this method that
* performs the necessary business logic.
* <p>There is no need to call any of the plugin hooks on the current Observable instance or
* the Subscriber.
* @param observer the incoming Observer, never null
*/
protected abstract void subscribeActual(Observer<? super T> observer);
它只是一个抽象方法,那么它的实现在哪里呢?
之前我们创建Observable的时候:我们在create()方法里创建了一个ObservableCreate
对象,就是它了,看看它的subscribeActual()是怎样的:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
首先创建一个继承自Emitter的CreateEmitter,然后调用Observer的onSubscribe()方法,
再调用ObservableOnSubscribe的subscribe()方法,这个方法就是我们的代码里的回调了:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
}
})
然后我们在回调里执行了
emitter.onNext(1);
进去看看它做了什么:
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(@NonNull T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(@NonNull Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
Emitter只是一个接口,它的实现是CreateEmitter,再看看它儿子长什么样:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public boolean isDisposed() {
return emitter.isDisposed();
}
// 其他代码省略
它是ObserverbleCreate的静态内部类!它仅仅就是调用Observer的onNext方法而已,其他也差不多。
至此,一个RxJava2的基本执行流程就分析完了,看起来还是很简单的。
那么再看看map操作:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
这套路跟之前的create很像,而且ObservableMap跟之前的ObservableCreate也很类似,它继承自AbstractObservableWithUpstream,这个暂时不管,也是继承自Observable的。
不过这里要注意的是,这里执行的是ObservableCreate的实例的map方法。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
// 给Observable重新订阅一次监听者
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
// 省略
}
}
我们之前在分析create的时候,它创建了一个ObservableCreate对象,在我们subscribe的时候,调用了这个对象的subscribeActual()方法,最终它在这里调用了ObservableOnSubscribe的subscribe()方法。
而这个MapCreate也是类似,只不过ObservableCreate中保存的source是ObservableOnSubscribe,而MapCreate中保存的,是ObservableSource,它是Observable的父类,因为,Observable在create后就已经创建成功了嘛。
在我们这个例子中,MapCreate中保存source的是之前创建好的ObservableCreate实例。它有个静态内部类MapObserver,它是BasicFuseableObserver的子类,而这个家伙是Observer的子类。
OK,现在再重新看看加入了map操作符之后,新的subscribe执行流程:
- 首先执行subscribe(),不过这时候执行的是ObservableMap实例的subscribe方法
- 接着会调用ObservableMap的subscribeActual方法
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
在这里创建了MapObserver实例,并调用ObservableCreate实例的subscribe()方法,并将这个Observer传入进去。也就是说,这时候ObservableCreate的观察者是一个中间人。
- 接下来就是ObservableCreate实例的subscribe过程了,在它看来,它不关心后面有多少个变换,还是照着自己原来的老步骤执行。
- ObservableCreate实例执行subscribeActual()时,调用MapObserver的onSubscribe()方法,然后再调用ObservableSubscribeOn的subscribe()方法。
- 前面说过,MapObserver是继承BasicFuseableObserver的,它重写了onSubscribe方法,如下:
@Override
public final void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
this.qs = (QueueDisposable<T>)s;
}
if (beforeDownstream()) {
actual.onSubscribe(this);
afterDownstream();
}
}
}
这里的actual就是实际上我们定义的真正的Observer。
- 接下来ObservableSubscribeOn的subscribe()执行的时候,我们就开始发送事件了。我们调用了CreateEmitter的onNext()方法发送事件。
再次贴出它的onNext()代码:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
前面说过,它的observer现在已经是MapObserver了,所以来到了这里:
MapObserver.java
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
重点关注这两行:
ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
actual.onNext(v);
这里的mapper就是我们map()操作符中创建的Function参数了,Function接口定义如下:
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
这时,我们的map就回调成功了,完了之后它就调用了真正Observer的onNext()方法。
至此,一次包含map()操作符的RxJava2的事件创建、订阅、map转换、事件发射、事件响应流程就解析完了。
从以上解析可以看出,不含map操作的时候,流程还是很简单的,包含了map就复杂些了。那么看完后,抛出个问题:map究竟是怎样实现变换的呢?如果我再给你多加几次map操作呢?哈哈,是不是有点晕?
其实总结起来,它就是这么一个思想:
- 每加一个操作符,它都会增加一个Observable和一个Observer
- 后来加入的Observable会持有前一个Observable的引用,用来调用前一个Observable的subscribe方法。
- 前一个Observable的Observer是后一个Observable创建的Observer
- 所有其他Observable的source都是上一个Observable,唯独ObservableCreate的source是ObservableOnSubscribe,它并不继承ObservableSource,它是事件的源头
- Observer的onSubscribe()由ObservableCrate()发起
- 这样就形成了两条链:订阅链和响应链
订阅链是从后往前订阅,响应链是从前往后执行。
这样就完成了操作符的无缝衔接。
至于那些变换,只是在链条中的事件形式变换而已。
上个总结图:
rxjava2-create-subscribe-emit-flow.png
这样就很清晰了,整个创建流程是从上到下,然后整个订阅流程又从下到上,事件发布时,又是从上到下,有木有,有木有,有木有!
再去看看其他操作符,都是这个套路,理解了这个,理解RxJava2就容易多了。