RxJava 基本使用
2018-05-30 本文已影响0人
翻滚吧王咸鱼
对RxJava 的学习,做一个记录,为以后的面试复习用. 我看一些RxJava 的介绍,个人感觉
RxJava 这个作者讲的简单明了.
通过水管来讲
老规矩 先放依赖
要在Android中使用RxJava2, 先添加Gradle配置:
//解释一下 implementation 跟compile 的区别
//compile依赖的确实可以做到依赖传递,但是AS 3.0开始推荐使用implementation取代了compile,
//依赖传递失效了. 而 ##implement## 的意思是将该依赖隐藏在内部,而不对外部公开. 在 app //mudule 中//使用 implement 依赖的第三方库, 在其他 mudule 是无法调用的,## compile ##android //studio 3.0 版本后废弃该指令 改用 api 代替, api 完全等同于之前的 compile 指令,
// 也就是普通的依赖, //第三方库在 mudule 中依赖后其他 mudule 都可以使用该库.官方推荐在不影响的前提下优先使用 //implement 指令依赖.
implementation 'io.reactivex.rxjava2:rxjava:2.1.14'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
下面就简单使用的例子
//Observable 被观察者 subscribe订阅 需要做的事
// create() 是 RxJava 最基本的创造事件序列的方法
Observable.create(new ObservableOnSubscribe<Integer>() {
// 此处传入了一个 OnSubscribe 对象参数
// 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
// 即观察者会依次调用对应事件的复写方法从而响应事件
// 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// ObservableEmitter类对象产生事件并通知观察者
// ObservableEmitter类介绍
// a. 定义:事件发射器
// b. 作用:定义需要发送的事件 & 向观察者发送事件
emitter.onNext(1); //发射第一个事件
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
})
//观察者Observer subscribe是建立联系
.subscribe(new Observer<Integer>() { //事件的响应拿到结果
// 观察者接收事件前,默认最先调用复写 onSubscribe()
@Override
public void onSubscribe(Disposable d) {
Log.e("----->", "subscribe");
}
// 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onNext(Integer integer) {
Log.d("----->", "" + integer);
}
// 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onError(Throwable e) {
Log.d("----->", "异常");
}
// 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onComplete() {
Log.d("----->", "complete");
}
});
// 说明:Subscriber类 = RxJava 内置的一个实现了 Observer 的抽象类,对 Observer 接口进行了扩展
<-- Observable.subscribe(Subscriber) 的内部实现 -->
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
// 步骤1中 观察者 subscriber抽象类复写的方法,用于初始化工作
onSubscribe.call(subscriber);
// 通过该调用,从而回调观察者中的对应方法从而响应被观察者生产的事件
// 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
// 同时也看出:Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时
}
image.png
注意: 只有当上游和下游建立连接之后, 上游才会开始发送事件. 也就是调用了subscribe() 方法之后才开始发送事件.
ObservableEmitter:
Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
但是,请注意,并不意味着你可以随意乱七八糟发射事件,需要满足一定的规则:
当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
上游可以不发送onComplete或onError.
最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然
注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, **并不一定会导致程序崩溃. ** 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.
Disposable:
这个单词的字面意思是一次性用品,用完即可丢弃的. 那么在RxJava中怎么去理解它呢, 对应于上面的水管的例子, 我们可以把它理解成两根管道之间的一个机关, 当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件.注意: 调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件.
被观察者 Observable的subscribe()具备多个重载的方法
* public final Disposable subscribe() {}
// 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示观察者只对被观察者发送的Next事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应
public final void subscribe(Observer<? super T> observer) {}
// 表示观察者对被观察者发送的任何事件都作出响应
可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
/**
* 可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
* 即观察者 无法继续 接收 被观察者的事件,但被观察者还是可以继续发送事件
*/
private void Dome2() {
Observable.create(new ObservableOnSubscribe<Integer>(){
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("----->", "开始发射:");
emitter.onNext(1); //发射第一个事件
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 1\. 定义Disposable类变量
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Log.d("----->", "接收到事件开始了" );
// 2\. 对Disposable类变量赋值
mDisposable = d;
}
@Override
public void onNext(Integer integer) {
Log.d("----->", "接收到事件"+integer );
if (integer == 2) {
// 设置在接收到第二个事件后切断观察者和被观察者的连接
mDisposable.dispose();
Log.d("----->", "已经切断了连接:" + mDisposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d("---->", "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d("---->", "对Complete事件作出响应");
}
});
}
简单学习到这里