大话RxJava:二、轻松学源码之基础篇
写在前面
前篇文章简单介绍了一下RxJava的概念与基本使用,本来准备继续说一下RxJava的进阶使用,包括一些复杂操作符的使用与线程控制的相关内容。然后想了想,不能这样急躁地去学如何使用,应该先稍微明白它的一些过程,后面学习起来思路更加清晰,这也是我之前学习RxJava的一个误区所在。
所以今天这篇文章,打算先认识一下基本的常见的两种操作符及其使用,然后采用图文的模式详细介绍一下RxJava的基础源码,主要包括create()与subscribe()这两个方法到底做了什么事情。
有了这两个基础后,后期学大量复杂且重要的操作符及其原理就有了基本的认知,至少学到复杂的地方不慌。而采用图文模式讲解源码看起来也不烦躁,轻轻松松得去学习。
下面是本文的目录:
- 写在前面
- 初识操作符
- Observable.just()
- Observable.from()
- 基础源码
- create()
- subscribe()
- 结语
- 参考资料
- 项目源码
初识操作符
所谓操作符(Operators),简单来说就是一种指令,表示需要执行什么样的操作。Rx中的每种编程语言实现都实现了一组操作符的集合。RxJava也不例外。
RxJava中有大量的操作符,比如创建操作符、变换操作符、过滤操作符等等,这些操作符要全部讲解完几乎是不可能也没必要的事情。所以我们只介绍常见的、有用的、重要的操作符。其他的如果用到直接到文档查找就行了。
下面就针对前篇文章的创建(create)来说明一下另外两种常见的创建操作符。先来回顾一下create()操作符,比较简单,这里就不解释了:
//创建Observable
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onCompleted();
}
}).subscribe(new Observer<String>() {
@Override
public void onNext(String s) {
Log.i("onNext ---> ", s);
}
@Override
public void onCompleted() {
Log.i("onCompleted ---> ", "完成");
}
@Override
public void onError(Throwable e) {
}
});
Observable.just()
首先给出定义:
Just操作符是创建一个将参数依次发送出来的Observable
具体一点来说就是, just() 中会接收1~9个参数,它会返回一个按照传入参数的顺序依次发送这些参数的Observable。
这样说可能还是不够清晰,所以画个图来看:
just流程图从图中可以看出,其实就是依次发送单个数据,它的具体写法是这样的,非常简单:
Observable.just("Hello","world");
//其实就相当于依次调用:
//subscriber.onNext("Hello");
//subscriber.onNext("World");
但是这里要注意一点,如果你传递null给just,它会返回一个发送null值的Observable,而不是返回一个空Observable(完全不发送任何数据的Observable)。后面会讲到,如果需要空Observable应该使用 Empty 操作符。
现在来看完整的代码,代码本身很简单,注意看Log日志:
//创建Observable
Observable.just("Hello", "World", null)
.subscribe(new Observer<String>() {
@Override
public void onNext(String s) {
if (s == null) {
Log.i("onNext ---> ", "null");
}else {
Log.i("onNext ---> ", s);
}
}
@Override
public void onCompleted() {
Log.i("onCompleted ---> ", "完成");
}
@Override
public void onError(Throwable e) {
Log.i("onError ---> ", "出错 --->" + e.toString());
}
});
Log
这里因为我们要打印字符串,所以不能为null,我就处理了一下,可以看到当发送 null 的时候,s确实等于null。
Observable.from()
尽管与just一样是创建操作符,但是from操作符稍微强大点。因为from操作符的作用是:
将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
注意,这里不再是发送单个对象,而是直接发送一组对象。为了与just对比,也来画个图描述一下:
from流程图它的具体写法是这样的,也非常简单:
String[] str = new String[]{"Hello", "World"};
//创建Observable
Observable.from(str);
这里由于篇幅关系,我就不贴完整代码了,跟just是类似的,十分简单。
基础源码
讲了两个简单但是常用的操作符后,我们回过头来看一下之前的实现RxJava的代码,这里我打上了Log日志,来看一下每个方法执行的顺序。
//创建Observable
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onCompleted();
Log.i("执行顺序 ---> ", " call ");
}
}).subscribe(new Observer<String>() {
@Override
public void onNext(String s) {
Log.i("onNext ---> ", s);
Log.i("执行顺序 ---> ", " subscribe onNext");
}
@Override
public void onCompleted() {
Log.i("onCompleted ---> ", "完成");
Log.i("执行顺序 ---> ", " subscribe onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i("onError ---> ", "出错 --->" + e.toString());
}
});
好了,来看一下Log日志:
执行顺序Log从图中可以看到,subscribe方法先执行,等执行完成后再执行call方法。
好了,这就是结论。先在脑子里产生个印象,方便后面追溯。
create()
先来看看Observable的create()方法做了些什么?Ctrl点进去看看:
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
啥事没干,就是返回一个 Observable。
再看看它的构造函数,构造一下对象:
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
再来看这个 hook.onCreate(f) 。 hook 是啥呢?
hook是一个代理对象, 仅仅用作调试的时候可以插入一些测试代码。
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
注意是仅仅,所以它几乎没啥用处,完全可以忽略。来看 hook.onCreate(f) :
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}
依然啥事没干,只是把 OnSubscribe 这个对象返回了一下。
OK,说到这里,虽然我一直在强调它“啥事没干”,其实仔细推敲,它还真做了三件事情:
- 返回了一个Observable(假设为ObservableA)
- 返回了一个OnSubscribe(假设为OnSubscribeA)
- 把返回的OnSubscribe在Observable构造函数中保存为Observable的 .onSubscribe 属性
说到这里,不知道大家看懂了没有。我第一次看这到这里的时候,还有略微有点模糊的,没关系,只要模糊那就画图理解:
create流程图这样看起来是不是轻松很多,create()就做了这样简单的事情,所以概括(但可能并不准确)地来说就是:
create()方法创建了一个Observable,且在这个Observable中有个OnSubscribe。
所以就画个简图就如下图所示这样,这个图要注意,之后还会扩展:
create简图好了,create方法就分析到这里,虽然有点啰嗦,但是已经十分详细了。
subscribe()
现在来看另外一个比较重要的操作 subscribe() ,在前篇文章中说过,这个是将观察者(Observer)与被观察者(Observable)联系到一起的操作,也就是产生一种订阅(Subcribe)关系。
跟前面的一样,先来看一下源码,点进去是这样的:
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new ObserverSubscriber<T>(observer));
}
之前我们说过一句话:
实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。
在这里就能够看出,首先 if 中的语句意思是如果这个Observer已经是Subscriber类型,那就直接返回。如果不是的话 new了一个ObserverSubscriber ,再点进去看看:
public final class ObserverSubscriber<T> extends Subscriber<T> {
final Observer<? super T> observer;
public ObserverSubscriber(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onCompleted() {
observer.onCompleted();
}
}
果然,它还是转成了Subscriber类型,刚好印证了之前的话。所以为了方便起见,之后文章中,所有的观察者(Observer)我都用Subscriber来代替。 这是一个小插曲,注意一下就好。
好了,继续看 subscribe 源码:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
...
}
把一些暂时无关的代码省略掉来看,其实就是执行了一句 hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); 。
而这个 hook.onSubscribeStart 方法再点进去看看:
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass through by default
return onSubscribe;
}
可以看到,竟然直接返回了一个 onSubscribe ,由于之前说过这个hook没什么作用,直接删掉,那就等于整个 subscribe 做了一件事就是 onSubscribe.call(subscriber) ,当然这个call里面的参数subscriber是我们代码中传递进去的。
而onSubscribe在create源码解析中我们已经知道是新建 ObservableA 的一个属性,所以总结来说,subscribe()方法做的事情就是这样:
ObservableA.onSubscribe.call(subscriber);
而调用 call 方法,就是调用传入的参数subscriber的onNext/onCompleted/onError方法。
这就是全部的过程。
看到这里,估计大家又迷糊了。没关系,依然画个图来说,图中省略了create中的创建步骤:
全过程结合图我们最后再顺一下思路:
- 首先创建过程也就是create()方法中创建了一个Observable,并有一个onSubscribe属性;
- 其次在订阅过程也就是subscribe()方法中,调用了create()方法中创建的Observable的onSubscribe属性的call方法;
- 最后这个call回调的就是代码中创建的Subscriber的onNext/onCompleted/onError方法。
不知道大家还记得那个Log日志么,从日志可以看到,将onNext与onCompleted方法执行完后,call方法才结束。这也印证了call方法回调Subscriber的方法这一说。
结语
好了,终于把源码中比较简单的部分讲解完了,等于是再复习了一边之前学的。
而且终于也把RxJava的入门知识讲解完了。后面的文章中,就开始学如何稍微高级一点的运用RxJava,比如map/flatmap操作符、lift的原理、线程控制的原理、各种运用场景等等,还有很长的路要走啊。
最后这是我第一次试着讲解源码,而且也在边学边分享,所以肯定有错误或不清楚的地方,欢迎大家指正与交流。
参考资料
项目源码
IamXiaRui-Github-FirstRxJavaDemo
个人博客:www.iamxiarui.com