RxJava的使用
1. 前言
现在RxJava越来越火,面对新技术,不用盲目追求,也不要有排斥心里。理解了它,再去做决择。本文是针对RxJava2.0的使用。
2. RxJava简介
一个实现异步的第三方库
3. 观察者模式
RxJava是基于观察者模式模式来实现的,下面介绍观察者模式的四个重要概念
- 被观察者(Observable):当满足某些条件时,它会通过发送事件的方式通知观察者
- 观察者(Observer):用于监听被观察者,处理被观察者发起的事件
- 订阅(subscribe): 在被观察者和观察者之间建立联系
- 事件:用于被观察者通知观察者,由被观察者发起,观察者接收处理
四者的关系如下图所示:

不过RxJava中的观察者模式,跟传统的观察者模式是反着的。传统的是观察者订阅被观察者,而RxJava中是被观察者去订阅观察者,这是为了方便构建被观察者时的链式调用,其实订阅就是建立两者的联系,所以先后顺序并不重要。RxJava中的观察者模式如下:

4. RxJava的使用
4.1 导入
在Android中使用RxJava还需要导入RxAndroid,它们的github和依赖地址如下:
https://github.com/ReactiveX/RxJava/
https://github.com/ReactiveX/RxAndroid
compile 'io.reactivex.rxjava2:rxjava:2.1.12'
compile 'io.reactivex.rxjava2:rxandroid:2.0.2'
4.2 RxJava的Hello World
// 1.创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
// 发起事件
emitter.onNext("Hello ");
emitter.onNext("World!");
emitter.onComplete();
}
});
// 2.创建观察者
Observer observer = new Observer() {
// 订阅时回调
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
// 处理被观察者发布的next事件
@Override
public void onNext(Object value) {
Log.e(TAG, "onNext: " + value);
}
// 处理被观察者发布的error事件
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
// 处理被观察者发布的complete事件
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
// 3.进行订阅
observable.subscribe(observer);
可见使用RxJava需要有如下三步
- 创建被观察者,并重写subscribe方法,定制被观察者发起的事件
- 创建观察者,重写相关方法,用于处理被观察者发起的事件
- 进行订阅,建立被观察者和观察者之间的关系
运行会输出

我们来分析下为什么这些log是这么输出的
首先,我们调用了被订阅者的subscribe方法和订阅者建立联系时,会先回调观察者的onSubscribe方法,表示订阅成功。
然后,会调用被订阅者的subscribe方法,去发布事件,在该方法里,调用了ObserableEmitter的onNext和onComlete方法,顾名思义,Observable就是一个被观察者的发射器,用于发布事件。所以调用的onNext、onComplete会触发观察者中的相应回调方法。
另外,说明下观察者中几个回调方法的作用
- onSubscribe:当被观察者订阅观察者时被调用,表示订阅成功
- onNext:普通事件,当被观察者发射next事件时被调用
- onError: 异常事件,当观察者发射error事件时被调用,之后的事件不接收
- onComplete: 完成事件,当观察者发射complete事件时被调用,之后的事件不接收
注意:OnError和onComlete事件互斥,代码上控制。就算一起调用,也不一定有问题。
如图所示:

4.3 被观察者的变形
其实除了上面通过create方法来创建被观察者Observable外,还有其他简便方式创建Observable。比如just:
Observable.just("Tom", "Josan", "Jim")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext name:" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});

可以看到,just操作,其实就是代替了create方式创建了Observable对象,且依次用just中的参数,调用了onNext()方法,相当于如下代码:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("Tom");
emitter.onNext("Josan");
emitter.onNext("Jim");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext name:" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
4.4 观察者的变形
被观察者Observable中subscribe有多个重载方法,如下所示:
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) { }
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete) { }
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 其他重载的方法,都会调用这个方法
public final void subscribe(Observer<? super T> observer) {}
其实其他的subscribe方法最后都会调用到最后一个subscribe方法,即参数为观察者Observer对象的重载方法,其实从其他方法的参数名就可以发现,它们只是处理了被观察者的部分事件,比如空参的subscribe方法表示不处理被观察者发出的任何事件,一个参数的subscribe方法,只关心了被观察发出的onNext事件,其他的也类似,最后会根据参入的参数去封装成一个观察者Observer对象,最后调用subscribe(Observer<? super T> Observer)方法。我们来看看最长的一个重载方法的实现:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
// 给传入的参数,封装成一个观察者对象
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
// 传入观察者对象,调用subscribe方法
subscribe(ls);
return ls;
}
4.5 事件的中止
既然被观察者可以发布事件让观察者处理,那观察者应该也可以不去处理被观察的发布事件。我们来看观察者的回调方法onSubscribe:
@Override
public void onSubscribe(Disposable d) {
}
该回调方法的参数Disposalbe对象的dispose方法就是用于中止观察者处理被观察者发布的事件,但是被观察者的事件还是可以正常发布的,只是观察者不进行处理。我们可以在onSubscribe方法中存储下Disposalbe对象,然后再需要的时候调用dispose来中止观察者去处理被观察者发布的事件,代码如下,:
/**
* 测试中止观察者处理被观察者发布的事件
*/
private void testDisposable() {
Observable.create(new ObservableOnSubscribe<Object>() {
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
Log.e(TAG, "subscribe: 1");
emitter.onNext("1");
Log.e(TAG, "subscribe: 2");
emitter.onNext("2");
Log.e(TAG, "subscribe: 3");
emitter.onNext("3");
Log.e(TAG, "subscribe: 4");
emitter.onNext("4");
Log.e(TAG, "subscribe: 5");
emitter.onNext("5");
emitter.onComplete();
}
}).subscribe(new Observer<Object>() {
private Disposable mDisposable;
private int i;
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
mDisposable = d;
}
@Override
public void onNext(Object value) {
Log.e(TAG, "onNext: " + value);
i++;
if (i == 2) {
Log.e(TAG, "onNext: dispose");
// dispose方法会将被观察者和观察者断开联系,从而导致观察者接受不到被观察者的事件
// 注意,该方法并不会组织被观察者发送事件,只是阻止观察者接收事件
mDisposable.dispose();
Log.e(TAG, "onNext: dispose:" + mDisposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
}

当我们处理了两个消息以后,调用Disposable的dispose方法,虽然subscribe中继续调用了onNext(3/4/5)发布事件,但是观察者的onNext方法并没有被触发。这样就完成了观察者中止处理被观察者发布的事件。如下所示:

4.6 操作符
RxJava提供了强大的操作符,方便我们对被观察者发起的事件,进行转换。
4.6.1 map
map的作用是对被观察者发起的每个事件,按照指定的规则,进行变换,这样观察者处理的事件就不是被观察者发起的事件了,而是经过map转换以后的事件。如下图所示:

举个例子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(59);
e.onNext(65);
e.onNext(85);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
String result = "";
if (integer < 60) {
result = "未及格";
} else if (integer < 80) {
result = "及格";
} else {
result = "优秀";
}
return result;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "学生成绩为:" + s);
}
});

我们发起的明明是59、65、85这样的整数类型的事件,但是经过map操作符的转换,观察者接收到的就是未及格、及格、优秀这样字符类型的事件了。
4.6.2 flatMap
flatMap是一个更复杂的操作符,它可以把
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onNext("2");
e.onNext("3");
}
}).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final String s) throws Exception {
// 利用原始被观察Observable发布出来的事件,重新构造一个新的被观察者Observable对象,并返回
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
// 新的观察者Observable需要发布的事件
emitter.onNext("A-" + s);
emitter.onNext("B-" + s);
emitter.onNext("C-" + s);
}
// 延迟100ms,是为了让旧的被观察者的事件发送出来,验证flatMap的无序性
}).delay(100, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String result) throws Exception {
Log.e(TAG, "accept result: " + result);
}
});

可以看到,旧的被观察者Observable对象每发布一个事件,都会调用一次apply方法,在该方法里可以拿到旧的被观察发布出来的事件,然后去构造一个新的被观察者的对象,并返回。
所以,观察者中接受到的消息,其实是新的被观察者发出的事件。
另外,flatMap是无序的,如果想有序,请使用concatMap转换符。
flatMap的原理,如下图所示:

4.7 线程控制
RxJava提供了强大的线程控制,用于执行异步操作。
默认情况下,被观察者发起事件的线程和观察者处理事件的线程,在同一个线程,如下:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
Log.e(TAG, "被观察者发起事件的线程 thread name:" + Thread.currentThread());
e.onNext("josan");
}
}).subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e(TAG, "观察者处理事件的线程 thread name:" + Thread.currentThread());
}
});

我们在主线程中发起的,所以也就是在主线程中处理的。
但是我们通常有需要到子线程中去做耗时操作,然后回到主线程去更新UI,如下图:

RxJava中内置的线程调度器就很好地解决了这个问题,看如下的代码:
Observable
.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
Log.e(TAG, "被观察者发起事件的线程 thread name:" + Thread.currentThread());
e.onNext("josan");
}
})
// subscribeOn指定的是被观察者发送事件的线程,多次指定,只有第一次有效
.subscribeOn(Schedulers.io())
// observeOn指定的是观察者接受事件的线程,多次指定,每指定一次,就切换一次线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e(TAG, "观察者处理事件的线程 thread name:" + Thread.currentThread());
}
});

可以看出,被观察者发起事件的线程是一个子线程,而处理消息的线程还是在主线程中。
这里就要提到Observable的如下两个方法了
- subscribeOn():指定被观察者发送事件的线程,多次指定,只有第一次有效
- observeOn(): 指定观察者处理事件的线程,多次指定,每指定一次,就切换一次线程
这两个方法的参数都是线程调度器Scheduler对象,常用的有如下几种
- Scheduler.io(): 代表IO操作的线程,用于网络、读写文件、读取数据等io密集型操作
- Schedulers.computation():代码CPU计算密集型的操作,需要大量的计算操作
- Schedulers.newThread(): 代表一个常规新线程
- AndroidSchedulers.mainThread(): 代码Android主线程
RxJava内部使用线程池来维护以上线程,所以效率会比较高。
前面提到,observeOn方法可以多次指定观察者处理事件的线程,每指定一次,都会切换一次线程,我们来看看是它的用法。
这里还要提到一个doOnNext,该方法会在观察者的onNext方法调用之前回调,所以我们可以借助它来完成切换观察者处理事件的线程。代码如下:
Observable
.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
e.onNext("josan");
Log.e(TAG, "被观察者: thread name:" + Thread.currentThread());
}
})
// 指定被观察者发布事件是在io线程
.subscribeOn(Schedulers.io())
// 指定处理事件在主线程
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.e(TAG, "accept1: thread name:" + Thread.currentThread());
}
})
// 再次切换处理线程
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.e(TAG, "accept2: thread name:" + Thread.currentThread());
}
})
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.e(TAG, "观察者: thread name:" + Thread.currentThread());
}
});

可以看到,我们多次使用observeOn方法可以多次切换处理事件的线程。
所以我们使用Observable的subscribeOn()和observeOn()方法可以很方便地指定被观察者发布事件的所在线程和观察者的处理线程。
4.8 背压(Backpressure)
RxJava主要是用于解决异步的问题,所以就会存在一个问题,当被观察者发起事件的速度远超过其他线程中观察者处理事件的速度时,就可能有问题,为了模拟这种情况,我们让处理消息之前睡眠3s,看下面的代码:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 不断地发送事件
for(int i = 0; ; i++) {
emitter.onNext(i);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// 让初期事件延时3s
SystemClock.sleep(3000);
Log.e(TAG, "accept: " + integer );
}
});


上面两张图分别是程序本来所占内存和被观察者不断发送消息时程序所占的内存,可以看出,内存暴增了。
那是什么原因,导致了这个问题呢?
其实,当被观察者发起事件和观察者处理事件不是在同一线程时,那么发起一件事件时,并不需要等待观察者处理完该事件就可以继续发起下一个事件,这些被发起了但还没被处理但事件会被存在一个集合里,等待观察者来取。示意图如下:

而怎么解决这个问题,这就要引入背压(Backpressure),它的作用就是为了控制被观察者发起事件的速度和数量。因为存储未被处理事件的集合是有限的,如果积累得过多,就会引起OOM问题。
所以解决方案就很明显了:
- 控制发布事件的数量,这样能减少集合中待处理事件的数量,比如我们需要在子线程中去更新进度条,我们没必要每一个数字都去更新,可以当能被5整除时更新。
- 控制发布事件的速度,这样能相对减缓发布和处理两种操作的速度差,比如实时更新股票的K线图,我们可以考虑延迟100ms更新一次。
4.9 Flowable
之前我们用到的被观察者都是Observable对象,观察者都是Observer对象。其实RxJava提供了支持背压的类Flowable和Subscriber,它们分别对应着别观察者和观察者,来看看它们的用法。
// 创建观察者
Flowable flowable = Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
for (int i = 0; i < 5; i++) {
Log.e(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR);
// 创建被观察者
Subscriber<Object> subscriber = new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
// 注意这句话
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
Log.e(TAG, "onNext: " + o);
}
@Override
public void onError(Throwable t) { }
@Override
public void onComplete() {}
};
// 订阅
flowable.subscribe(subscriber);

其实跟Obserbable+Observer的组合差不多,不过有两点需要注意:
- 创建Flowable时,多了一个参数,它是用于选择背压的,也就是出现发布事件远快于处理事件时应该怎么处理,这里为BackpressureStrategy.ERROR,即抛出错误。
- Subscriber中订阅成功的回调方法onSubscribe的的参数不是Dispose对象,而是Subscription对象,它也提供了cancel方法用于Subscriber不再处理消息。另外,它还有个request方法,这个方法的作用是告诉被观察者能处理多少个事件,这里为Long值的最大数,所以肯定能处理发布的事件。
如果我们不在订阅成功的回调方法里写request方法,会怎么样?log如下:

可以看到,当没有调用request方法时,表示观察者没有处理事件的能力,所以只是发送了,并没有进行处理。
在订阅成功的回掉方法中通过FlowableEmitter对象的requested方法得到当前外部请求的数量,它也会随着发出的next事件自减。代码如下:
// 创建观察者
Flowable flowable = Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
for (int i = 0; i < 5; i++) {
// 打印request
Log.e(TAG, "request: " + emitter.requested());
Log.e(TAG, "emit " + i);
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
// 创建被观察者
Subscriber<Object> subscriber = new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
// 设置观察者处理事件的个数为10
s.request(10);
}
@Override
public void onNext(Object o) {
Log.e(TAG, "onNext: " + o);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
// 订阅
flowable.subscribe(subscriber);

可以看到,requested初始为10,是因为如果发布事件和处理事件是一个线程,则request()即指定了requested的初始值,随着每次发送next事件,requested会减少1。

我们再来看看异步的情况,这时候指定观察者可以处理的事件个数为5,代码如下:
// 创建观察者
Flowable flowable = Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
// 打印requested
Log.e(TAG, "requested: " + emitter.requested());
Log.e(TAG, "emit " + i);
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
// 创建被观察者
Subscriber<Object> subscriber = new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
// 指定观察者能处理的事件为5
s.request(5);
}
@Override
public void onNext(Object o) {
Log.e(TAG, "onNext: " + o);
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() { }
};
// 订阅,
flowable
// 在子线程中发布事件
.subscribeOn(Schedulers.io())
// 在主线程中处理事件
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);

可以看到,当我们通过request(5)指定了观察者能处理当事件为5时,尽管发起了10个next事件,但是只有5个得到了处理。也就是有10个next事件被加入到了待处理事件的集合中,但是只有前面的5个事件被取出来处理了。
另外,reuqested初始为128个,为什么不是5个呢?其实是这样的,requested是线程特有的,当发布事件和处理事件不是同一线程时,request方法只是指定当前线程的requested。而发布事件线程requested的大小由Flowable内部指定,为128个。
如图所示:

如果,我们在订阅成功的回调方法onSubscribe中先保存Subscription对像,然后再增加一个点击按钮,点击时再调用request方法,结果会怎么样?代码如下:
// 创建被观察者
Subscriber<Object> subscriber = new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
// 保存Subscription对象,用于再次调用request方法
subscription = s;
s.request(5);
}
// 无关代码
......
};
@OnClick(R.id.btn_request)
public void onRequest() {
subscription.request(1);
}
多次点击按钮,会发现每次点击按钮,即每调用一次request,观察者都会从待处理事件集合中取出1个事件来处理。

5. 总结
- RxJava是基于观察者模式实现的
- RxJava提供了just、range等多种方式创建被观察者
- RxJava提供了多个参数的Consumer来创建观察者
- RxJava提供了map、flatMap等多种变换操作符
- RxJava提供了Disposable和Subscription用于中止观察者处理事件的能力,Subscription还可以指定观察者处理事件的个数
- RxJava可以通过subscribeOn和observeOn指定发布事件和处理事件的线程
- RxJava提供了Flowable和Subscriber来支持背压
- 下面会再写一篇关于RxJava的应用和源码分析