RxJava2源码分析——订阅
本文章主要是对RxJava2的订阅流程进行源码分析,先说下我用的RxJava和RxAndroid版本,版本如下:
implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
我们先写段示例代码,为了方便理解,我就不用上Lambda和链式调用了,代码如下:
// 创建被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
Log.i("TanJiaJun", "subscribe");
emitter.onNext("Tan");
emitter.onNext("Jia");
emitter.onNext("Jun");
emitter.onComplete();
}
});
// 创建观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("TanJiaJun", "onSubscribe");
}
@Override
public void onNext(String s) {
Log.i("TanJiaJun", "onNext:" + s);
}
@Override
public void onError(Throwable e) {
Log.i("TanJiaJun", "onError");
}
@Override
public void onComplete() {
Log.i("TanJiaJun", "onComplete");
}
};
// 订阅
observable.subscribe(observer);
分成三步:
- 创建被观察者(Observable)。
- 创建观察者(Observer)。
- 调用被观察者的subscribe方法,传入观察者,将两者进行关联并且订阅。
源码分析
我们先从subscribe方法入手,代码如下:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
// 判断observer是不是空
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// 调用子类的subscribeActual方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
看下RxJavaPlugins.onSubscribe方法,代码如下:
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
注释说这个方法会调用关联的钩子函数(hook function),我们看到它会判断一下onObservableSubscribe是不是空,这个变量是通过setOnObservableSubscribe方法赋值的,代码如下:
@SuppressWarnings("rawtypes")
public static void setOnObservableSubscribe(
@Nullable BiFunction<? super Observable, ? super Observer, ? extends Observer> onObservableSubscribe) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
RxJavaPlugins.onObservableSubscribe = onObservableSubscribe;
}
然而我们没有调用这个方法,所以这里空的,直接返回observer。
我们接着往下看,subscribeActual是个很重要的方法,它是个接口来的,Observable的子类都要去实现这个方法,接下来在讲创建被观察者的时候就会遇到。
我们调用Observable.create方法,代码如下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
RxJavaPlugins.onAssembly方法也是一个钩子函数,代码如下:
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
它会判断onObservableAssembly变量是不是空,这个变量是通过setOnObservableAssembly方法赋值的,代码如下:
@SuppressWarnings("rawtypes")
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
然而我们没有调用这个方法,所以我们直接看创建的ObservableCreate对象,要注意的点我都写上注释了,代码如下:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
// source在我们的示例代码里是上游Observable对象(被观察者)
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 创建CreateEmitter对象,传入下游Observer对象(观察者)
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 调用下游Observer对象的onSubscribe方法,并且传入CreateEmitter对象
observer.onSubscribe(parent);
try {
// 调用上游Observable对象的subscribe方法,并且传入CreateEmitter对象
source.subscribe(parent);
// 这里可以得出结论,先执行下游Observer的onSubscribe方法,然后执行上游Observable的subscribe方法
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
// 发射一个错误的事件
parent.onError(ex);
}
}
// 该类继承了AtomicReference,可以实现原子操作
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
// 传入下游Observer对象
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
// 在RxJava2.x版本中,onNext方法不能传null,否则抛出空指针异常
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 当isDisposed方法为false时,调用下游observe的onNext方法,并且传入对应的对象
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
// 在RxJava2.x版本中,onError方法不能串null,否则抛出空指针异常
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
// 当isDisposed方法返回false时,调用下游observer的onError方法,并且传入Throwable对象,然后调用dispose方法
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
// 当isDispoesed方法返回false时,调用下游observer的onComplete方法,然后调用dispose方法
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public String toString() {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
}
}
// 省略部分代码
}
传入的是ObservableOnSubscribe接口,里面有个带ObservableEmitter参数的subscribe方法,代码如下:
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
我们示例代码实现了这个方法,代码如下:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
Log.i("TanJiaJun","subscribe");
emitter.onNext("Tan");
emitter.onNext("Jia");
emitter.onNext("Jun");
emitter.onComplete();
}
});
依次调用了ObservableEmitter的onNext方法和onComplete方法,这里的ObservableEmitter实现类是CreateEmitter,代码如下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
// parent是CreateEmitter
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
调用onNext方法和onComplete方法,实际上是调用了下游Observer的onNext方法和onComplete方法,代码如下:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// observer是下游观察者
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
// observer是下游观察者
observer.onComplete();
} finally {
dispose();
}
}
}
也就是调用了我们示例代码中这些方法,代码如下:
@Override
public void onNext(String s) {
Log.i("TanJiaJun",s);
}
@Override
public void onComplete() {
Log.i("TanJiaJun","onComplete");
}
总结一下,整个流程如下:
- 调用上游Observable的subscribe方法,并且传入下游Observer。
- subscribe方法里面执行了Observable的子类ObservableCreate的subscribeActual方法,并且传入下游Observer。
- subscribeActual方法里面会依次执行下游Observer的onSubscribe方法和ObservableOnSubscribe的subscribe方法,从而完成整个订阅流程。
- 如果我们去发射事件,例如示例代码中调用ObservableEmitter的onNext方法和onComplete方法,那么下游Observer的onNext方法和onComplete方法就会执行。
我的GitHub:TanJiaJunBeyond
Android通用框架:Android通用框架
我的掘金:谭嘉俊
我的简书:谭嘉俊
我的CSDN:谭嘉俊