RxJava入门
前言
- 什么是RxJava? 简单来说,RxJava是基于观察者模式,提供便捷的异步操作的一套API。
- RxJava好在哪?它提供了一系列丰富的操作符,支持链式调用,可以便捷的进行线程的切换。
- 本文基于RxJava 2.2.2,是自己学习过程中的笔记,方便以后查阅使用
简介
RxJava最基本的两个元素:
1 Observable(被观察者)
2 Observer(观察者)
我们通过subscribe(订阅)便可使它们形成订阅关系。下面就看一下它们最基本的实现:
创建一个Observable:
Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
}
});
创建一个Observer:
Observer observer =new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
};
订阅:
observable.subscribe(observer);
链式调用:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
以上,就可以简单的使用RxJava了。。。what? 辣鸡,写了点啥?
1 observable.subscribe(observer); 这个方法干了啥?
observable.subscribe(observer); //这个方法干了啥?
2 ObservableOnSubscribe. subscribe(ObservableEmitter<String> observableEmitter)这个方法啥时候调用,observableEmitter怎么来的?,干什么的?
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
//这个方法啥时候调用,observableEmitter怎么来的?,有什么用?
}
}
首先,我们从头开始看,Observable是个抽象类,其静态方法,Observable.create(ObservableOnSubscribe<T> source ):
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");//对传进来的source参数判空,如果是null,则抛出异常。
return RxJavaPlugins.onAssembly(new ObservableCreate(source));
}
看看onAssembly这个方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
return f != null ? (Observable)apply(f, source) : source;//f 初始值是null的,也就是说这个方法返回值便是传进来的source参数,至此Observable创建完成
}
至此,Observable创建流程便清楚了,大体就是通过调用 Observable.create(ObservableOnSubscribe<T> source )方法。new一个ObservableCreate对象并返回,接下来再看看observable.subscribe(observer) 这个方法
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");//判空
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
this.subscribeActual(observer);//通过上面我们知道这个实际上调用了ObservableCreate.subscribeActual(observer)
} catch (NullPointerException var4) {
throw var4;
} catch (Throwable var5) {
Exceptions.throwIfFatal(var5);
RxJavaPlugins.onError(var5);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(var5);
throw npe;
}
}
通过上面可以看出,实际上调用了ObservableCreate.subscribeActual(observer),再看看 ObservableCreate这个类
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;//就是create中的ObservableOnSubscribe对象
}
/**
*实现了subscribeActual这个抽象方法
*/
protected void subscribeActual(Observer<? super T> observer) {
ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter(observer);
observer.onSubscribe(parent);//observer便是我们创建的observer对象
try {
this.source.subscribe(parent);//也就是create中的ObservableOnSubscribe对象的suscribe方法
} catch (Throwable var4) {
Exceptions.throwIfFatal(var4);
parent.onError(var4);
}
}
...
}
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;//我们创建的Observer
}
public void onNext(T t) {
if (t == null) {
this.onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
} else {
if (!this.isDisposed()) {
this.observer.onNext(t);
}
}
}
public void onError(Throwable t) {
if (!this.tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!this.isDisposed()) {
try {
this.observer.onError((Throwable)t);
} finally {
this.dispose();
}
return true;
} else {
return false;
}
}
public void onComplete() {
if (!this.isDisposed()) {
try {
this.observer.onComplete();
} finally {
this.dispose();
}
}
}
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
public void setCancellable(Cancellable c) {
this.setDisposable(new CancellableDisposable(c));
}
public ObservableEmitter<T> serialize() {
return new ObservableCreate.SerializedEmitter(this);
}
public void dispose() {
DisposableHelper.dispose(this);
}
public boolean isDisposed() {
return DisposableHelper.isDisposed((Disposable)this.get());
}
public String toString() {
return String.format("%s{%s}", this.getClass().getSimpleName(), super.toString());
}
}
}
以上,便可清楚,当调用observable.subscribe(observer) ,observer的onSubscribe(ObservableEmitter<T> observableEmitter)方法便会调用,同时create中的ObservableOnSubscribe对象的suscribe方法也会调用
总结
Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
//在observable.subscribe(observer);执行后执行,
//observableEmitter里面的Observer便是我们创建的Observer,其onNext,onError,
// onComplete均会交给我们创建的Observer执行
}
});
Observer observer =new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
//在observable.subscribe(observer);执行后执行
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(observer);
observable.subscribe(observer);订阅方法执行后,observer的onSubscribe(Disposable disposable)方法会调用,ObservableOnSubscribe的subscribe(ObservableEmitter<String> observableEmitter)方法会调用,其中observableEmitter便是由我们创建的observer包装而成的,其发送的事件会被我们创建的Observer收到(disposable.dispose()之后的收不到)。以上便是个人学习RxJava的一点理解。