RxJava 2.x 源码分析(一)
主要介绍
Rxjava
是如何利用观察者模式实现通信
依赖
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
核心类和接口
Observable (被观察者)
被观察者
必须继承的抽象类
// T: `被观察者`发射的 item 的类型
public abstract class Observable<T> implements ObservableSource<T>
Observer (观察者)
观察者
必须实现的接口
public interface Observer<T>
Emitter (发射器)
用于
被观察者
发射信息给观察者
// T: 发射的 item 类型
public interface ObservableEmitter<T> extends Emitter<T>
例子使用到的的相关类
ObservableCreate
被观察者
的子类,Observable.create()
方法返回Observable
就是该类的实例
public final class ObservableCreate<T> extends Observable<T>
CreateEmitter(发射器)
ObservableCreate
的内部类,实现了ObservableEmitter
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable
Disposable
一个提供断开操作的接口,所有需要做断开操作的类都要实现该接口
/**
* Represents a disposable resource.
*/
public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();
/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed();
}
ObservableOnSubscribe
一个简单的功能接口,只负责给
被观察者
提供订阅方法和传一个发射器实例给被观察者
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
从一个简单的例子查看源码:
private void RxJava() {
// 第一步:初始化'被观察者' observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
// 传一个事件给观察者观察
e.onNext(1);
e.onNext(2);
// 调用 onComplete 方法后调用接收不到后面的事件
e.onComplete();
// 向观察者传一个异常事件,并且观察者接收不到后面的事件
//e.onError(new NullPointerException("空指针"));
e.onNext(3);
e.onNext(4);
}
});
// 第二步:初始化'观察者' observer
Observer<Integer> observer = new Observer<Integer>() {
private int i;
private Disposable mDisposable;
/**
* 该方法是订阅操作中第一个调用的回调方法
* 提供了一个可主动切断的对象
*/
@Override
public void onSubscribe(@NonNull Disposable d) {
mDisposable = d;
}
/**
* `被观察者`调用该方法并传给`观察者`一个 T 类型的 item
*/
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG, "Observer 接收到: " + integer);
i++;
if (i == 2) {
// Disposable 用来切断观察者与被观察者之间联系,被观察者之后的事件不再传给观察者
mDisposable.dispose();
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "Observer onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "Observer onComplete");
}
};
// 第三步:订阅
observable.subscribe(observer);
}
第一步中create()
中进入源码:
// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// 判断传入的 ObservableOnSubscribe 对象是否为 null
ObjectHelper.requireNonNull(source, "source is null");
// 通过 RxJavaPlugins 组装并返回 ObservableCreate 的实例
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Observable
是一个抽象类,当我们使用 Observable.create
方法创建被观察者 observable
的时候实际上是创建一个 ObservableCreate
对象。
订阅方法 subscribe
中实际进行订阅操作的是一个抽象方法 subscribeActual
:
// Observable.java
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// 实际进行订阅的操作
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
...
}
}
下面是 ObservableCreate
的实现:
// ObservableCreate.java
protected void subscribeActual(Observer<? super T> observer) {
// 创建发射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 调用'观察者'的 onSubscribe 方法
observer.onSubscribe(parent);
try {
// source : 被观察者
// parent : 发射器
// 这里回调我们覆写的 Observable 的 subscribe 方法,开始发射事件操作
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
subscribeActual
订阅操作方法调用顺序:
观察者
的onSubscribe
方法 -> 被观察者
的subscribe
方法 -> 被观察者
发射操作onNext()
-> 观察者
的 onNext()
从源码中得知,我们需要覆写的subscribe
方法中得到的 e 实际上就是CreateEmitter
,当我们调用e.onNext(1)
的时候实际上就是调用 CreateEmitter
的onNext
// ObservableCreate.CreateEmitter.java
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 判断被观察者是否已断开了
if (!isDisposed()) {
// 回调我们覆写的观察者的 onNext 方法
observer.onNext(t);
}
}
从CreateEmitter
的onNext()
中我们可以知道只有当被观察者
没有断开的时候才会回调观察者
的onNext()
// ObservableCreate.CreateEmitter
@Override
public void onComplete() {
if (!isDisposed()) { //检测是否已断开
try {
// 把Complete事件发射给观察者
observer.onComplete();
} finally {
// 断开操作
dispose();
}
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
// onError() 方法最终也会调用该方法
dispose();
}
return true;
}
return false;
}
@Override
public void dispose() {
// 断开操作
DisposableHelper.dispose(this);
}
很多人都知道 onError() 和 onComplete() 不能并存,从上面源码可以看到两个方法最终都会执行 dispose()
,所以无论执行哪个方法最终都会切断被观察者
和观察者
之间的联系,之后调用的方法都会失效。且由DisposableHelper
这个枚举类实现具体操作
public enum DisposableHelper implements Disposable {
/**
* The singleton instance representing a terminal, disposed state, don't leak it.
*/
// 用来标记断开状态
DISPOSED
;
/**
* Atomically disposes the Disposable in the field if not already disposed.
* @param field the target field
* @return true if the current thread managed to dispose the Disposable
*/
public static boolean dispose(AtomicReference<Disposable> field) {
/**
* 还记得 CreateEmitter 继承了 AtomicReference 吗
* 下面的操作属于原子性操作:
*/
// field: CreateEmitter
// 首次调用时获取的 current 为 null
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
// getAndSet 操作是把 d 的值赋给 field,但是返回旧值(null)
// 下一次执行该 dispose 方法时 field.get()的值就会变成 DISPOSED
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
/**
* Checks if the given Disposable is the common {@link #DISPOSED} enum value.
* @param d the disposable to check
* @return true if d is {@link #DISPOSED}
*/
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
}
这里我们注意到断开操作的核心就是让发射器
继承原子性引用类 AtomicReference
保存和改变状态,防止多线程并发出现异常
总结
-
被观察者
的subscribe()
方法就像是一个触发器(Trigger),调用后被观察者
先前准备好的一系列事件按顺序通过发射器传给观察者
-
如果调用了
onComplete()``onError()
或者观察者
主动调用dispose()
方法,被观察者
与观察者
之间的联系将被切断,观察者
不再接受到被观察者
后续发射的事件