Rxjava源码解析--基本用法源码分析
2019-03-26 本文已影响4人
二妹是只猫
观察者(订阅)模式
想了解rxjava就绕不过观察者模式,在观察者(订阅)模式文中对该模式有一个基本的介绍。
创建于使用
- 第一步创建被观察者(订阅者):
Observable observable = Observable.create(new
ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter)
throws Exception {
emitter.onNext("hello");
}
});
- 第二步创建观察者:
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
System.out.println("{}"+o);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
- 订阅:
observable.subscribe(observer);
这样一个简单的Rxjava就创建成功并成功订阅了,当运行上方代码时,订阅者Observer的onNext方法会被调用并接收到“hello”
现在来看看观察者、被观察者是如何被创建并关联上的
- 创建被观察者
Observable.create(ObservableOnSubscribe<T> source)
:
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
...
}
create方法返回一个Observable的子类ObservableCreate
,给其中ObservableOnSubscribe赋值。注意subscribeActual方法
,之后调用被观察者订阅观察者的subscribe
最终就是执行的这儿。
- 创建观察者对象Observer:
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
Observer很简单就是一个接口
- 最关键的一步,观察者于被观察者绑定
observable.subscribe(observer)
:
Observable:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
通过上面subscribeActual(observer)
这个抽象方法,调用到Observerble的实现类Observerblecreate
的subscribeActual(observer)
(果然如我们上面所说的):
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
...省略代码
}
这里创建了CreateEmitter
,它实现了ObservableEmitter接口,这里就将我们刚开始创建的被观察者的subcriibe(ObservableEmitter<Object> emitter)关联了起来。最终我们我们在Observable的subscribe方法中调用的emitter.onNext("hello")
就是它实现的:
到这里我们的rxjava示例就完整的运行完毕了,最终就是Observable的实ObservableCreate
与Observer
关联,并通过CreateEmitter
发送通知消息,1中调用到observer的onNext(t)
,这样一个简单的rxjava就实现了。