Android RxJava使用分析(1)
源码地址:
RxJava和RxAndroid都是结合使用,地址如下:
RxJava的github地址:https://github.com/ReactiveX/RxJava
RxAndroid的github地址:https://github.com/ReactiveX/RxAndroid
一、RxJava是什么?
1.Rx(Reactive Extensions)是一个库,用来处理【事件】和【异步】任务,在很多语言上都有实现,RxJava是Rx在Java上的实现。简单来说,RxJava就是处理异步的一个库,最基本是基于观察者模式来实现的。通过Obserable和Observer的机制,实现所谓响应式的编程体验。它扩展了观察者模式以支持数据/事件序列,并添加了运算符,使您可以声明性地将序列组合在一起,同时抽象化了对低级线程,同步,线程安全和并发数据结构等问题的关注。
2、RxAndroid是基于RxJava适用于Android的封装;官方给的这句大概会让你明白适配封装了啥:More specifically, it provides a Scheduler that schedules on the main thread or any given Looper.
二、概念
1.Observable 被观察者 ,事件源。是一个抽象类。
------ 1.1 ObservableEmitter 发射器;
2.Observer 观察者,事件接收处理。是一个接口。
3.subscribe 订阅,把被观察者和观察者关联起来。
先看下面RxJavaAndroid .java就好理解了这几个概念了。
三、基础使用
1.集成依赖
build.gradle中dependencies 下依赖;代码如下(示例):
dependencies {
implementation "io.reactivex.rxjava3:rxjava:3.0.12"
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
}
2.用法及分析
public class RxJavaAndroid {
public static void main(String[] args) {
doRxJava();
}
private static void doRxJava() {
//通过 Observable.create 创建被观察者,Observable是一个抽象类
Observable<String> observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
//发射器发送消息
emitter.onNext("hello world");
//通过发射器发射异常
//emitter.onError(new Throwable("模拟一个异常"));
//发射完成
emitter.onComplete();
}
}
);
//通过 new Observer 创建观察者;Observer是一个 interface
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
//第一个执行的方法
System.out.println("onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
System.out.println("onNext>>>" + s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError>>>" + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
//通过被观察者 Observable 的 subscribe (订阅) 绑定观察者 Observer
observable.subscribe(observer);
}
}
代码执行结果如下:
image.png
查看Observable源码
Observable 是数据的上游,事件源,即事件生产者。
//一:接看代码 Observable.create()方法
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
//ObservableOnSubscribe是一个只有subscribe方法的接口
Objects.requireNonNull(source, "source is null");//判空
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
//二:继续向下看,进入RxJavaPlugins.onAssembly方法
/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook's input value
* @return the value returned by the hook
*/
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
//源码注释有介绍,Calls the associated hook function. hook java反射、钩子。底层管用黑科技。
//apply(f, source)的返回值依旧是Observable,里面不过多探究。
综上我们能发现
Observable.create(new ObservableOnSubscribe() { })
相当于
new ObservableCreate(new ObservableOnSubscribe() { }))
总结:事件的源就是 new ObservableCreate()对象,将 ObservableOnSubscribe 作为参数传递给 ObservableCreate 的构造函数。
查看Observer 源码
Observer 是数据的下游,即事件消费者。Observer 是个 interface。
查看源码,只有如下几个方法
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
查看observable.subscribe 源码:
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null"); //判空
try {
observer = RxJavaPlugins.onSubscribe(this, observer);//通过hook返回observer
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);//核心、真正实现订阅的方法,仔细看subscribeActual是一个抽象方法,所以我们需要去Observable的实现类ObservableCreate中去查看
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
ObservableCreate类中发现
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer); //创建发射器,并把观察者当参数传递给发射器
observer.onSubscribe(parent);//直接回调了observer的onSubscribe方法
try {
source.subscribe(parent);//source为Observable,事件源
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
由上述源码可以看到,在ObservableCreate类的subscribeActual方法下,调用了Observer(观察者即事件接收者)的onSubscribe,并把observer对象当做参数专递给了发射器CreateEmitter;同时将发射器CreateEmitter传递给了事件源。因此可以得出结论:
只有当 observable.subscribe(observer);时,发射器才会被创建,Observer才会被绑定onSubscribe; Observable的subscribe方法才会执行发射事件和数据;此时Observable和Observer的方法和回调都已经准备就绪,只待发送与接收。换言之,事件流是在订阅后才产生的。而 observable 被创建出来时并不生产事件,同时也不发射事件。
ObservableEmitter发射器是如何发生的呢?
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public String toString() {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
}
}
CreateEmitter<>(observer);
CreateEmitter实现了ObservableEmitter,同时ObservableEmitter继承自Emitter,CreateEmitter 还实现了 Disposable 接口,这个 disposable 接口是用来判断是否中断事件发射的。
CreateEmitter的主要方法如下:
public void onNext(T t) {}
public void onError(Throwable t) {}
public boolean tryOnError(Throwable t) {}
public void onComplete() {}
前面我们说了 observer是个接口,与 observer的方法神似,几乎一一对应。
然后我们来分析一下CreateEmitter主要方法内都做了什么。
@Override
public void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
return;
}
if (!isDisposed()) { //判断是否丢弃
observer.onNext(t); //调用Emitter的onNext,它会直接调用observer的 onNext
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t); //调用 Emitter 的 onError,它会直接调用 observer 的 onError
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose(); //执行完中断发射
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) { /判断是否丢弃
try {
observer.onComplete();//调用Emitter的onComplete,它会直接调用observer的 onComplete
} finally {
dispose(); //执行完中断发射
}
}
}
CreateEmitter 的 onError 和 onComplete 方法任何一个执行完都会执行 dispose()中断事件
发射,所以 observer 中的 onError 和 onComplete 也只能有一个被执行。
结合以上,我们知道了,当订阅成功后,数据 源ObservableOnSubscribe 开始生产事件 , 调用 Emitter 的 的 onNext ,onComplete 向下游发射事件,Emitter 包含了 observer 的引用,又调用了 observer onNext ,onComplete ,这样下游observer 就接收到了上游发射的数据。
四、调用流程分解
4.1 :将调用流程代码进行分解
public void test() {
//事件源source
ObservableOnSubscribe source = new ObservableOnSubscribe<Object>() {
/**
* 回调方法subcribe
* @param emitter 发射器参数
* @throws Exception
*/
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
Log.i(TAG, "test==>create...subscribe...");
// TODO 第二步 发射器emitter调用onNext发送消息
emitter.onNext("hello world");
// TODO 通过发射器发射异常(如果有异常执行onError方法,就不会再执行onComplete,否则不执行onError而是执行onComplete)
// emitter.onError(new Throwable("模拟一个异常"));
// TODO 发射完成
emitter.onComplete();
}
};
Observable.create(source)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {//匿名内部类Observer,后面分析
@Override
public void onSubscribe(Disposable d) {
//TODO 第一步,发起订阅
Log.i(TAG, "test==>onSubscribe");
}
@Override
public void onNext(Object o) {
// TODO 第三步,
Log.i(TAG, "test==>onNext");
}
@Override
public void onError(Throwable e) {
//TODO 第四步(有异常执行onError不执行onComplete)
Log.i(TAG, "test==>onError");
}
@Override
public void onComplete() {
// TODO 第四步(无异常执行onComplete不执行onError)
Log.i(TAG, "test==>onComplete");
}
});
}
4.1.1 首先看下Observable.create()方法
/**
* //Observable.java
*
* Observable.create->返回ObservableCreate(source)
* ObservableCreate是Observable的子类
*
* ObservableOnSubscribe source事件源
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//为hook函数(钩子函数),实际返回的是ObservableCreate
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
4.1.2 接着看ObservableCreate(可以看做自定义被观察者)类的实现,是被观察者Observable的子类
//被观察者Observable的子类
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//先记住此处的ObservableCreate的静态内部类CreateEmitter,后续分析
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
//省略部分代码
......
}
}
4.1.3 执行Observable的subscribeOn方法,传递的参数为Schedulers.io(),内部的实现是个线程调度器IoScheduler,后面再对Scheduler进行分析
/**
* //Observable.java
* 此处scheduler=IoScheduler
*
* ObservableSubscribeOn 参数包含(传递source,ioscheduler)
*
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//此处RxJavaPlugins.onAssembly为hook函数,实际返回ObservableSubscribeOn对象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
4.1.4 接着看下 Observable的subscribe方法,执行订阅
/**
* //Observable.java
* 调用subscribe方法
*
*/
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//RxJavaPlugins.onSubscribe 为hook函数
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//实际调用的是subscribeActual
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
调用Observable的subscribeActual方法
protected abstract void subscribeActual(Observer<? super T> observer);
可以发现是个抽象类,实际调用的是Observable的子类ObservableCreate的subscribeActual方法
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//上面我们提到的CreateEmitter发射器,持有匿名内部类对象observer
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//调用observer的onSubscribe方法
observer.onSubscribe(parent);
try {
//回调上面最初提到的事件源source的subscribe方法,执行订阅
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
......
}
上面创建了CreateEmitter发射器,在这里我们调用了observer.onSubscribe(parent),也就是我们创建的匿名observer类的onSubscribe方法。
source.subscribe(parent)最重要的方法,观察者和被观察者顺利会师,事件开始执行:
/**
* 回调方法subcribe
* @param emitter 发射器参数
* @throws Exception
*/
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
Log.i(TAG, "test==>create...subscribe...");
// TODO 第二步 发射器emitter调用onNext发送消息
emitter.onNext("hello world");
// TODO 通过发射器发射异常(如果有异常执行onError方法,就不会再执行onComplete,否则不执行onError而是执行onComplete)
// emitter.onError(new Throwable("模拟一个异常"));
// TODO 发射完成
emitter.onComplete();
}
看下CreateEmitter的onNext和onComplete方法,就是判断任务是否取消,没有取消则调用观察者的onNext和onComplete方法
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
4.1.5 执行流程总结:
1 create方法传返回了一个对象是ObservableCreate,ObservableCreate的构造方法中有一个ObservableOnSubscribe对象,也就是我们使用create时候创建的匿名内部类对象。
2 p.subscribe(o)实际上调用了ObservableCreate的subscribeActual方法
3 subscribeActual中首先调用了 observer的onSubscribe方法,紧接着调用了source.subscribe(parent)也就是ObservableOnSubscribe的subscribe方法,事件开始执行
4 subscribe方法中调用CreateEmitter的onNext方法,这个方法调用了observer的onNext方法,观察者对事件进行反应.
5 subscribe方法中调用CreateEmitter的onComplete方法,这个方法调用了observer的onComplete方法,整个流程结束。
————————————————
参考:
https://blog.csdn.net/Kern_/article/details/115701475
https://segmentfault.com/a/1190000020062497?utm_source=tag-newest
https://blog.csdn.net/qq_40270270/article/details/113726399