RxJava2.0实用操作符总结及原理简析
本文首发于“随手记技术团队”公众号
大概从2015年开始,RxJava1.0开始快速流行起来,短短两年时间,RxJava在Android开发中已经算是无人不知无人不晓了,加之它与Retrofit等流行框架的完美结合,已经成为Android项目开发的必备利器。随手记作为一个大型项目,引入三方框架一直比较慎重,但也从今年初开始,正式引入了RxJava2.0,并配合Retrofit对项目的网络框架和繁琐的异步逻辑进行重构。RxJava虽然好用,但伴随而来的是不可避免的学习成本,为了让大家快速的了解RxJava的来龙去脉以及快速上手使用,特地总结该篇文章。本文将详细讲解如何快速理解RxJava的操作符,并从源码角度来分析RxJava操作符的原理。
RxJava的优点
简单来讲RxJava是一个简化异步调用的库,但其实它更是一种优雅的编程方式和编程思想,当你熟悉RxJava的使用方式之后,会很容易爱上它。
我总结它的优点主要有两个方面:
- 简洁,免除传统异步代码逻辑中的callback hell
- 增加业务逻辑代码的可读性
关于第一点大家应该都会认同,关于第二点可能有人会有疑惑,因为很多人觉得RxJava大量不明所以的操作符会让代码的可读性变得更差,其实产生这种印象恰恰就是因为没有掌握RxJava操作符的使用和原理所导致的。
比如随手记项目中绑定用户QQ账号的业务逻辑,这段逻辑的代码涉及三个异步接口,两个是QQ登录SDK的,一个是随手记后台的,在使用RxJava重构前,这段代码使用了3个AsyncTask,也就是三个嵌套的回调,代码复杂,可读性非常差。而改造之后,它变成了下面这样子
如果你对这里面的几个RxJava操作符比较熟悉的话,你会迅速了解我这段代码做了什么事情,而且不用再去梳理一堆嵌套回调了,这就是RxJava带来的可读性。
所以,学习RxJava,理解和掌握操作符是不可避免的第一步。
RxJava2.0与RxJava1.0的关系
从RxJava1.0到RxJava2.0,基本思想没有变化,但RxJava2.0按照Reactive-Streams规范对整个架构进行了重新设计,并变更了Maven仓库依赖地址和包名。所以现在RxJava的github网站中,RxJava1.0和RxJava2.0是两个独立的分支,不相互兼容 ,也不能同时使用,而且RxJava1.0再过一段时间也将不再维护。所以,目前还使用RxJava1.0的,建议尽早切换到RxJava2.0,而如果没有接触过RxJava1.0,直接使用和学习RxJava2.0就可以了。如果想了解RxJava1.0和RxJava2.0的详细区别,请参考官方文档。
为行文方便,从此处开始,本文使用Rx来表示RxJava2.x。
Rx的操作符有哪些
刚接触Rx的人面对一堆各式各样的操作符会觉得不知如何去学习记忆,其实你只需要从整体上了解Rx操作符的类别和掌握一些使用频率较高的操作符就足够了,至于其他的操作符,你只需要知道它的使用场景和掌握如何快速理解一个操作符的方法,就可以在需要的时候快速拿来用了。
下图是我根据官方文档总结的Rx操作符的分类及每个类别下的代表性操作符
从上图可以看出,Rx的操作符主要十个大类别,每个类别下常用的操作符也就三五个左右,所以只要掌握这些,你就可以应付大部分的业务场景了。
如何快速理解一个Rx操作符
提到Rx操作符,相信很多人都会对描述Rx操作符的花花绿绿的宝石图有很大印象。
Rx宝石图
要快速理解Rx操作符,看懂宝石图是个快捷有效的方式,现在我们就来详细分析一下构成宝石图的各个主要元素。
首先,我们有必要回顾一下Rx中的几个主要的基类
-
io.reactivex.Flowable
: 事件源(0..N个元素), 支持 Reactive-Streams and 背压 -
io.reactivex.Observable
:事件源(0..N个元素), 不支持背压 -
io.reactivex.Single
: 仅发射一个元素或产生error的事件源, -
io.reactivex.Completable
: 不发射任何元素,只产生completion或error的事件源 -
io.reactivex.Maybe
: 不发射任何元素,或只发射一个元素,或产生error的事件源 -
Subject
: 既是事件源,也是事件接受者
可以看到Rx中最重要的概念就是事件源
了,基本上所有的操作符都是针对事件源来进行一些转换、组合等操作,而我们最常用的事件源就是Observable
了。
本文中我们就以Observable
事件源为例来讲解Rx的操作符,Observable
发射的事件我们统一称之为item。首先我们需要详细了解一下宝石图中各个图像元素的含义:
-
—>
:Observable
的时间线,从左至右流动 -
★
:星星、圆、方块等表示Observable
发射的item -
|
:时间线最后的小竖线表示Observable
的事件流已经成功发射完毕了 -
X
:时间线最后的X符合表示由于某种原因Observable
非正常终止发射,产生了error
上面几种元素组合在一起代表一个完整的Observable
,也可以称为源Observable
-->
方向朝下的虚线箭头表示以及中间的长方框表示正在对上面的源Observable
进行某种转换。长方框里的文字展示了转换的性质。下面的Observable
是对上面的源Observable
转换后的结果。
掌握了宝石图的含义,我们就可以根据某个操作符的宝石图快速理解这个操作符了。举几个例子:
1. map
可以看到,这幅图表达的意思是一个源Observable
先后发射了1、2、3的三个item,而经过map
操作符一转换,就变成了一个发射了10、20、30三个item的新的Observable
。描述操作符的长方框中也清楚的说明了该map
操作符进行了何种具体的转换操作(图中的10*x只是一个例子,这个具体的转换函数是可以自定义的)。
于是,我们就很快速地理解了map
操作符的含义和用法,简单来讲,它就是通过一个函数将一个Observable
发射的item逐个进行某种转换。
示例代码:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return integer * 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer result) throws Exception {
Log.i(TAG, "accept : " + result +"\n" );
}
});
输出结果:
2. zip
根据
zip
的宝石图,可以知道zip操作符的作用是把多个源Observable
发射的item通过特定函数组合在一起,然后发射组合后的item。从图中还可以看到一个重要的信息是,最终发射的item是对上面的两个源Observable
发射的item按照发射顺序逐个组合的结果,而且最终发射的1A
等item的发射时间是由组合它的1
和A
等item中发射时间较晚的那个item决定的,也正是如此,zip
操作符经常可以用在需要同时组合处理多个网络请求的结果的业务场景中。示例代码:
Observable.zip(Observable.just(1, 2, 3),
Observable.just("A", "B", "C"),
new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
return integer + s;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG, "zip : accept : " + s + "\n");
}
});
输出结果:
3. concat
从宝石图可以看出,
concat
操作符的作用就是将两个源Observable
发射的item连接在一起发射出来。这里的连接指的是整体连接,被concat
操作后产生的Observable
会先发射第一个源Observable
的所有item,然后紧接着再发射第二个源Observable
的所有的item。示例代码:
Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "concat : " + integer + "\n");
}
});
输出结果:
大部分操作符都配有这样的宝石图,通过官方文档或者直接在Rx源码中查看JavaDoc就可以找到,不再过多举例。你也可以在rxmarbles这样的网站上查看更多可以动态交互的宝石图。
Rx操作符的原理
要了解操作符的原理,肯定要从源码入手喽。所以我们先来简单撸一遍Rx的最基本的Create操作符的源码。
Rx的源码目录结构是比较清晰的,我们先从Observable.create
方法来分析
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("s");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 创建的Observer中多了一个回调方法onSubscribe,传递参数为Disposable ,Disposable相当于RxJava1.x中的Subscription,用于解除订阅。
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
create
方法如下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
代码很简单,第一行判空不用管,第二行调用RxJavaPlugins
的方法是为了实现Rx的hook功能,我们暂时也无需关注,在一般情况下,第二行代码会直接返回它的入参即ObservableCreate
对象,ObservableCreate
是Observable
的子类,实现了Observable
的一些抽象方法比如subscribeActual
。事实上Rx的每个操作符都对应Observable
的一个子类。
这里create
方法接受的是一个ObservableOnSubscribe
的接口实现类:
/**
* A functional interface that has a {@code subscribe()} method that receives
* an instance of an {@link ObservableEmitter} instance that allows pushing
* events in a cancellation-safe manner.
*
* @param <T> the value type pushed
*/
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
通过注释可以知道这个接口的作用是通过一个subscribe
方法接受一个ObservableEmitter
类型的实例,俗称发射器。
Observable.create
方法执行时,我们传入的就是一个ObservableOnSubscribe
类型的匿名内部类,并实现了它的subscribe
方法,然后它又被传入create
方法的返回对象ObservableCreate
,最终成为ObservableCreate
的成员source
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
...
接着我们来看Observable
的subscribe
方法,它的入参是一个Observer
(即观察者,也就是事件接收者)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
最终它会调用它的子类ObservableCreate
的subscribeActual
方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
在subscribeActual
里首先创建了用于发射事件的CreateEmitter
对象parent
,CreateEmitter
实现了接口Emitter
和Disposable
,并持有observer
。
这段代码的关键语句是source.subscribe(parent)
,这行代码执行后,就会触发事件源进行发射事件,即e.onNext("s")
会被调用。细心的同学也会注意到这行代码之前,parent
先被传入了observer
的onSubscribe()
方法,而在上面我们说过,observer
的onSubscribe()
方法接受一个Disposable
类型的参数,可以用于解除订阅,之所以能够解除订阅,正是因为在触发事件发射之前调用了observer
的onSubscribe()
,给了我们调用CreateEmitter
的解除订阅的方法dispose()
的机会。
继续来看CreateEmitter
的onNext()
方法,它最终是通过调用observer
的onNext()
方法将事件发射出去的
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 在真正发射之前,会先判断该CreateEmitter是否已经解除订阅
if (!isDisposed()) {
observer.onNext(t);
}
}
...
}
至此,Rx事件源的创建和订阅的流程就走通了。
下面我们从map
操作符来入手看一下Rx操作符的原理,map
方法如下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
map
方法接受一个Function类型的参数mapper
,返回了一个ObservableMap
对象,它也是继承自Observable
,而mapper
被传给了ObservableMap
的成员function
,同时当前的源Observable
被传给ObservableMap
的成员source
,进入ObservableMap
类
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
可以看到这里用到了装饰者模式,ObservableMap
持有来自它上游的事件源source
,MapObserver
持有来自它下游的事件接收者和我们实现的转换方法function
,在subscribeActual()
方法中完成ObservableMap
对source
的订阅,触发MapObserver
的onNext()
方法,继而将来自source
的原始数据经过函数mapper
转换后再发射给下游的事件接收者,从而实现map这一功能。
现在我们终于能够来总结一下包含多个操作符时的订阅流程了,以下面这段代码为例
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("holen");
}
})
.map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return s.length();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
执行代码时,自上而下每一步操作符都会创建一个新的Observable
(均为Observable
的子类,对应不同的操作符),当执行create
时,创建并返回了ObservableCreate
,当执行map
时,创建并返回了ObservableMap
,并且每一个新的Observable
都持有它上游的源Observable
(即source
)及当前涉及到的操作函数function
。当最后一步执行订阅方法subscribe
时会触发ObservableMap
的subscribeActual()
方法,并将最下游的Observer
包装成MapObserver
,同时该方法又会继续调用它所持有ObservableCreate
的订阅方法(即执行source.subscribe
),由此也会触发ObservableCreate
的subscribeActual()
方法,此时我们的发射器CreateEmitter
才会调用它的onNext()
方法发射事件,再依次调用MapObserver
的操作函数mapper
和onNext()
方法,最终将事件传递给了最下游的Observer
的onNext()
方法。
我简单的将这段逻辑用下面这幅图来表示
操作符lift
和compose
lift
和compose
在�Rx中是两个比较特殊的操作符。
lift
让我们可以对Observer
进行封装,在RxJava1.0中大部分变换都基于lift
这个神奇的操作符。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> lift(ObservableOperator<? extends R, ? super T> lifter) {
ObjectHelper.requireNonNull(lifter, "onLift is null");
return RxJavaPlugins.onAssembly(new ObservableLift<R, T>(this, lifter));
}
lift
操作符接受一个ObservableOperator
对象
/**
* Interface to map/wrap a downstream observer to an upstream observer.
*
* @param <Downstream> the value type of the downstream
* @param <Upstream> the value type of the upstream
*/
public interface ObservableOperator<Downstream, Upstream> {
/**
* Applies a function to the child Observer and returns a new parent Observer.
* @param observer the child Observer instance
* @return the parent Observer instance
* @throws Exception on failure
*/
@NonNull
Observer<? super Upstream> apply(@NonNull Observer<? super Downstream> observer) throws Exception;
}
看注释可以知道,这是一个将下游订阅者包装成一个上游订阅者的接口。类似Map操作符中的MapObserver。
而compose
操作符让我们可以对Observable
进行封装
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}
wrap
方法如下,仅仅是走了RxJavaPlugins
的流程
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> wrap(ObservableSource<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
if (source instanceof Observable) {
return RxJavaPlugins.onAssembly((Observable<T>)source);
}
return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}
compose
方法接受一个ObservableTransformer
对象
/**
* Interface to compose Observables.
*
* @param <Upstream> the upstream value type
* @param <Downstream> the downstream value type
*/
public interface ObservableTransformer<Upstream, Downstream> {
/**
* Applies a function to the upstream Observable and returns an ObservableSource with
* optionally different element type.
* @param upstream the upstream Observable instance
* @return the transformed ObservableSource instance
*/
@NonNull
ObservableSource<Downstream> apply(@NonNull Observable<Upstream> upstream);
}
ObservableSource
即为我们的基类Observable
继承的唯一接口。看注释可以知道,ObservableTransformer
是一个组合多个Observable
的接口,它通过一个apply()
方法接收上游的Observable
,进行一些操作后,返回新的Observable
。
这里组合多个Observable
的意思其实就是组合多个操作符,比如我们经常会需要在使用Rx进行网络异步请求时进行线程变化,这个操作一般都是差不多的,每次都写会比较烦,这时我们就可以使用compose
把常用的线程变换的几个操作符组合起来
private final ObservableTransformer schedulersObservable = new ObservableTransformer() {
@Override
public ObservableSource apply(Observable upstream) {
return upstream.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
protected void testCompose() {
getNetObservable()
.compose(schedulersObservable)
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append(s);
}
});
}
关于compose
的典型应用,大家有兴趣还可以去看一下开源项目RxLifecycle,它就是巧妙地利用compose
操作符来解决了使用Rx可能会出现的内存泄露问题。
Rx操作符的应用场景
说了这么多,其实我们最关心的还是Rx操作符的应用场景。其实只要存在异步的地方,都可以优雅地使用Rx操作符。比如很多流行的Rx周边开源项目
而针对自己想要实现的功能情景,如何去选择特定的操作符,官网的文档中也列出了一些指导——Rx操作符决策树。
当然除了这些,我们在开发项目时,还会有各种具体的业务场景需要选择合适的操作符,这里我总结了一些经常遇到的场景以及适合它们的操作符
只要我们理解了Rx操作符的原理,熟练掌握了一些使用频率较高的操作符,就能够在以上场景中轻松地使用,不再让自己的代码被复杂的业务逻辑搞得混乱。
以上就是本文的全部内容,关于Rx还有很多东西值得深入地学习研究,后续有机会再跟大家分享更多Rx的使用心得。