RxJava浅析二——map操作原理
我之前写过RxJava1.x中Map操作的原理,然而时隔半年,我已经全然忘记RxJava1.x是怎么实现的了,唯一的印象是挺复杂的。
当时可能也是浅尝辄止,对其理解不是那么深,过一段时间决定就忘记了。所以把这些现在理解的东西记录下来是很有必要的。
所以,蹬蹬瞪蹬……,RxJava2版本的Map原理闪亮登场。
本文是基于RxJava浅析——事件如何从上游传递到下游。建议先看下这篇。
先上代码,没错,本文就是要分析下这段代码的执行过程。跟上一篇分析区别就在与create()
与subscribe()
之间多了一个map
操作。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return String.valueOf(integer);
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError:", e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
});
我们知道Observable.create()
会返回一个ObservableCreate
对象。ObservableCreate
是Observable
的子类。记住这是我们第一个碰到的具体的Observable
实现类。
那么核心代码就在这里了:
//Observable的map方法。ObservableCreate.map使用的是父类(Observable)的实现
//不是源码,去掉了一些空检查等
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return new ObservableMap<T, R>(this, mapper);
}
//附上Observable.create()方法对比一下
//代码位置:io.reactivex.Observable.java
//不是源码,是我简化后的代码。源码还有空检查,Hook方法的回调等。这些不影响整个逻辑,所以可以先忽略。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return new ObservableCreate<T>(source);
}
呀呀呀,这段代码是不是很熟悉,跟create()
方法一样的套路啊。只是这个返回的是一个ObservableMap
,这个ObservableMap
也是Observable
的子类,这是我们碰到的第二个具体的Observable
实现类。
来看看构造方法中看了啥。
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);//其实就是在父类中保存source,不用在乎这个super。
this.function = function;//我们外部实现的Function接口。
}
呀呀呀,也只是保存了一下source
和function
。套路还是一样的。注意注意,这个source
应该是create()
方法返回的ObservableCreate
对象哦~
奥,解释一下这个Function
也是一个接口。
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
这个接口中只有一个apply()
方法,接收T
类型参数,返回R
类型参数。在我们的例子里就是接收Integer
类型的参数,返回String
类型的参数。妥妥的,就是一个map
操作嘛。把Integer
类型转换成了String
类型。怎么转换?当然是我们自己实现Function
接口。在例子里只是简单地把Integer
转换成了String
而已。
所以到此为止,啥也没发生。
关键当然还是把上下游连接起来的subscribe(Observer)
方法。
map
操作时候返回的是ObservableMap
对象,所以subscribe(Observer)
方法最后会调到ObservableMap.subscribeActual()
方法。这个还不清楚的,参见“RxJava浅析——事件如何从上游传递到下游”
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function)); //Point 0
}
这货干了两件事情:
- 创建了
MapObserver
。 - 调用了
source.subscribe()
方法,并把创建出来的MapObserver
传入。
先来讲第一件事情,这是我们除了自己创建的Observer
之外,第一次遇到RxJava2内部创建的Observer
对象。MapObserver
是ObservableMap
的静态内部类。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);//Point 1
this.mapper = mapper;//Point 2
}
}
可以看到这个MapObserver
继承BasicFuseableObserver
, 可以猜到其父类肯定是继承Observer
.
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R>
这个QueueDisposable
可以就理解成Disposable
。我们暂时不扯那么远。
回到Point 1,这句话就是把我们外部的Observer
给保存下来。
Point 2,这句话就是把我们的function
保存下来。
重点来了!
看Point 0处的代码,这个source
是我们的ObservableCreate
,这里的入参是刚刚创建的ObserverMap
。所以RxJava内部又发生了我们在“RxJava浅析——事件如何从上游传递到下游”所分析的事情。
还记得么?这个subscribe()
方法最终调到ObservableCreate.subscribeActual()
方法。再来看一遍:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);//Point 3
try {
source.subscribe(parent); //Point 4
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这里就很熟悉啦。只是这里的observer
是RxJava内部帮我们创建的ObserverMap
。
Point 3 会调到ObserverMap.onSubscribe()
//代码在BasicFuseableObserver中,非源码。去掉其他的一些代码
@Override
public final void onSubscribe(Disposable s) {
this.s = s;
actual.onSubscribe(this);
}
这个actual
就是我们在创建ObserverMap
时传入的,就是我们外部实现的Observer
。可以把this
传出去,是因为ObserverMap
本身也是Disposable
接口的实现类。当然入参s
(实际上是CreateEmitter
)也会保存下来。
Point 4中会调到我们外部实现的ObservableOnSubscribe
接口的subscribe()
。即执行事件发送。再提一下,这个source
是在创建ObservableCreate
时保存的。
所以呢,当我们使用CreateEmitter
发送数据时: e.onNext(1)
实际上也是先调到CreateEmitter
的onNext(1)
,他调到他自己保存的Observer
的onNext(1)
,在我们的例子里这个就是ObserverMap
。所以我们来看ObserverMap
的onNext
方法。
//代码位置:ObserverMap的onNext方法。
final Function<? super T, ? extends U> mapper;
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
关键点在于mapper.apply(t)
,这句话把T类型的对象转换成了U类型的对象,也就是我们例子中把Integer
类型的对象转换成了String
类型。requireNonNull
只是对转换后的对象进行了空检查。因为这个转换是我们外部提供的实现,并不能保证非空。
最后再调用actual.onNext(v)
就是调到了我们提供的Observer
的onNext
方法,此时类型已经发生了转换。
以上就是map
操作的原理啦。对onError
和onComplete
事件其实没有起什么作用。
最后用一个图来解释:
map操作原理图.png没有Map操作的时候
- 事件流向:CreateEmitter->Observer。
- 创建流的过程如图中数字所示。
增加Map操作的时候事件流向
- 事件流向:CreateEmitter->ObserverMap->Observer。
- 创建流的过程如图中数字所示。