Rxjava(一)之流程分析与基本操作
前言
Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。
Rxjava是一个基于时间流,实现异步操作的库。
定义:Rxjava简单来说就是采用的观察者内模式来定义的,被观察者(Observable)通过订阅(Subscribe)按照一定顺序将事件发送给观察者(Observer),观察者按顺序接收事件并做出相应的响应动作。
首先我们引入Rxjava库
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.0.7'
基本使用
Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
}
});
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
integerObservable.subscribe(observer);
首先创建一个被观察者integerObservable 做为上游,接受创建一个观察者observer 做为下游接受integerObservable 传来的消息,最后调用被观察者的订阅方法subscribe()进行订阅。
当然,Rxjava最经典的还是如下的基于事件流的链式调用结构。有着逻辑简洁,代码优雅,使用简单等优点。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG,"Observable onSubscribe");
emitter.onNext(1);
emitter.onNext(2);
Log.d(TAG,"start send onComplete");
emitter.onComplete();
Log.d(TAG,"end send onComplete");
emitter.onNext(3);
emitter.onNext(4);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG,"Observer onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG,"Observer onNext: "+ integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG,"Observer onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG,"Observer onComplete");
}
});
在上述代码中我们加入了一些逻辑和日志,运行上述代码会得到下面结果:
Observer onSubscribe
Observable onSubscribe
Observer onNext: 1
Observer onNext: 2
start send onComplete
Observer onComplete
end send onComplete
通过分析上面的结果,我们不难发现,Rxjava的流程是从被观察者的subscribe()方法开始,然后到观察者的onSubscribe()方法,通过发射器(emitter)发送事件,最终在发射完onComplete()事件后结束,这里我们注意到,最后发射的两个事件,下游并没有收到,这是因为发送完onComplete()事件后,观察者的任务就完成了。
如果我们这里在发送onError()事件,会抛出UndeliverableException异常,但是先调用onError()再调用onComplete()事件是可以的。由于篇幅关系,这里我就不贴源码了。
这里我们介绍一个简单的观察者Consumer。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,"accept: "+integer);
}
});
accept: 1
accept: 2
我们可以看到,简易的观察者也能接收到被观察者发送的消息。
下面我们介绍一下Disposable对象,它可以通过disposable()方法切断观察者和被观察者之间的联系,虽然被观察者还可以继续发送事件,但是观察者已经接收不到。具体示例代码和结果如下:
Disposable disposable;
public void r01(View view) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG,"Observable onSubscribe");
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
disposable =d;
Log.d(TAG,"Observer onSubscribe");
}
@Override
public void onNext(Integer integer) {
if (integer == 2) {
disposable.dispose();
}
Log.d(TAG,"Observer onNext: "+ integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG,"Observer onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG,"Observer onComplete");
}
});
}
Observer onSubscribe
Observable onSubscribe
Observer onNext: 1
Observer onNext: 2
本文对RxJava的进行了基本介绍,对基本用法进行了讲解。