RxJava 2.x 源码分析(二) 之 FlatMap
FlatMap
- 官方定义:把
被观察者
发射出去的事件转化成新的子被观察者
,然后把这些发射量展开平铺后统一放到一个被观察者
中。官方文档
-
简单来讲就是把
被观察者
每次发射的事件转化成一个子被观察者
,然后通过合并(Merge)所有子被观察者
的事件成总的一系列的事件并发射给观察者
。 -
官方文档中提及到很多语言都拥有 Merge 和 Concat 的合并操作,他们的区别是前者会顺序交错,而后者是不会破坏顺序的。
-
所以
FlatMap
和ConcatMap
的区别是合并后的事件顺序有可能是无序的,但FlatMap
真的不能做到有序事件吗?本文也会探讨这个问题。
RxJava
使用了观察者模式
,封装了很多Observable
和Observer
,针对不同的操作符的调用会用对应的ObservaEble
和Observer
实现。
根据源码Observable
发射的事件都是有序的,使用FlatMap
时由事件转换的被观察者
也是有序地发射自己的事件,我们可以猜测:
-
FlatMap
事件无序的关键是线程,当由事件转换成的多个被观察者
在不同线程中发射事件时,会导致顶层观察者接收到的事件是无序的。 - 反之所有
被观察者
都在同一个线程中发射时间的话FlatMap
的效果跟ConcatMap
是相同的。
为了证实我们的猜测,我们先具体简单的例子:
public static void main(String args[]) {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(
"item " + integer + " sub-item " + 1 + " Observable Thread: " + Thread.currentThread().getName()
, "item " + integer + " sub-item " + 2 + " Observable Thread: " + Thread.currentThread().getName()
, "item " + integer + " sub-item " + 3 + " Observable Thread: " + Thread.currentThread().getName());
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s + " Observer Thread: " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
运行结果:
item 1 sub-item 1 Observable Thread: main Observer Thread: main
item 1 sub-item 2 Observable Thread: main Observer Thread: main
item 1 sub-item 3 Observable Thread: main Observer Thread: main
item 2 sub-item 1 Observable Thread: main Observer Thread: main
item 2 sub-item 2 Observable Thread: main Observer Thread: main
item 2 sub-item 3 Observable Thread: main Observer Thread: main
item 3 sub-item 1 Observable Thread: main Observer Thread: main
item 3 sub-item 2 Observable Thread: main Observer Thread: main
item 3 sub-item 3 Observable Thread: main Observer Thread: main
阅读源码之前我们需要知道,RxJava 内部包装了很多Observable
和Observer
,用FlatMap
实现该例子的实现方法不止一种,使用的操作符也可以不一样,所以运行时调用到的Observable
和Observer
不一定相同,所以下面阅读的源码路径只是基于这个例子。
- 首先看看
Observable
的创建过程:
//Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
跟上一篇源码分析一样,内部创建的是ObservableCreate
对象,ObservableOnSubscribe
是一个提供subscribe()
作为约定函数的接口。
- 接下来看看
flatMap()
做了什么
//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false);
}
mapper
代表的是我们的刚才创建的Function对象,这里有函数式编程的味道(如果用过Kotlin的同学就更加熟悉了),相当于把我们写的函数作为一个参数。
false
表示异常是否需要延迟到所有内部被观察者都结束后才抛出。
继续进下一层
//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}
maxConcurrency: 最大并发数
继续进下一层
//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
bufferSize :缓存所有子被观察者
的事件加起来总数大小
继续进下一层
//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
// 检查参数是否合法
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
if (this instanceof ScalarCallable) { //false
@SuppressWarnings("unchecked")
T v = ((ScalarCallable<T>)this).call();
if (v == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(v, mapper);
}
// ObservableCreate 没有实现 ScalarCallable,所以走这里
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
这里返回一个 ObservableFlatMap
对象,实际上ObservableFlatMap
包装了ObservableCreate
并把ObservableCreate
对象作为常量source
,那么flatMap()
到这里结束了。
- 下面看看
subscribe()
的源码:
//Observable.java
public final void subscribe(Observer<? super T> observer) {
// 检查观察者是否为空
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// 调用 hook 方法
observer = RxJavaPlugins.onSubscribe(this, observer);
// 检查调用 hook 的观察者是否为空
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// 实际订阅操作在这个方法里
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
...
}
}
下面看看 subscribeActual
:
// Observable.java
protected abstract void subscribeActual(Observer<? super T> observer);
居然是一个抽象方法,在哪里被实现了?机智的我们反应过来了,具体实现在flatMap()
返回的ObservableFlatMap
中:
// ObservableFlatMap.java
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
// source : ObservableCreate对象
// t : 观察者
// mapper : 之前我们写的 mapper 函数
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) { // false
return;
}
// 本次运行走这里,调用 ObservableCreate 的 subscribe() 并用
// MergeObserver 包装了我们写的 observer
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
可以说ObservableFlatMap
对FlatMap
来说是比较重要的类,里面包含了许多重要逻辑。
调用 ObservableCreate
的 subscribe()
之前我们先看看 MergeObserver
的构造方法:
// ObservableFlatMap.java
MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
this.actual = actual; // 我们写的 observer
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency; // Integer.MAX_VALUE
this.bufferSize = bufferSize;
if (maxConcurrency != Integer.MAX_VALUE) {
sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
}
// 创建一个原子性的内部观察者对象数组
this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
}
由于ObservableCreate
没有覆写 subscribe()
,所以实际上调用的是父类Observable
的subscribe()
且源码上面已经贴过,可以直接跳过进入ObservableCreate
的subscribeActual()
中:
// ObservableCreate.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 创建发射器,这里先忽略发射器内部实现,只需要知道发射器主要用来回调观察者的
// onNext onComplete onError 方法
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 回调 MergeObserver 的 onSubscribe()
// 再由其回调我们写的 observer 的 onSubscribe()
observer.onSubscribe(parent);
try {
// source : 我们创建的 ObservableOnSubscribe 对象
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
下面走到了我们写的subcribe()
逻辑里,我们调用了发射器的 onNext()
,看看发射器的源码:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 检测是否切断
// 如果切断了被观察者就接收不到后续的事件了
if (!isDisposed()) {
// observer : MergeObserver 对象
// 回调观察者的 onNext()
observer.onNext(t);
}
}
还记得我们写的 observer
对象被包装成了MergeObserver
,那么进入MergeObserver
的onNext()
:
// ObservableFlatMap.java
@Override
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
// 把我们的 mapper 方法里面返回的子被观察者提取出来,
// 由于我们当初用的是 .just() 创建子被观察者,
// 所以子被观察者是 ObservableFromArray 对象,
// 这里先忽略 just() 内部实现
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
if (maxConcurrency != Integer.MAX_VALUE) { // false
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
// 进入下面的函数
subscribeInner(p);
}
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
// p : ObservableFromArray 对象
if (p instanceof Callable) { //false
tryEmitScalar(((Callable<? extends U>)p));
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
break;
}
}
} else {
break;
}
} else {
// ObservableFromArray 没有实现 Callable 接口,所以走这里
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner); // 飙车入口
}
break;
}
}
}
// 这里主要做内部观察者对象数组的增加
// 通过创建size为原数组长度+1的新数组并作为新的内部观察者对象数组来实现
boolean addInner(InnerObserver<T, U> inner) {
for (;;) {
// 获取之前 MergeObserver 创建的内部观察者对象数组
InnerObserver<?, ?>[] a = observers.get();
if (a == CANCELLED) {
inner.dispose();
return false;
}
int n = a.length; // 0
InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = inner;
if (observers.compareAndSet(a, b)) {
return true;
}
}
}
看到这里我们应该到了 p.subscribe(inner)
,又调用Observable
的subscribe()
?到现在我们好像已经调用了好几次了,证明这两个东西都被包装好几层了,前面我提到过RxJava包装了许多Observable
和Observer
,配合观察者模式一层一层地地传递事件下去,这是 RxJava 的其中一个奥妙之处。
下面整个过程我们会一直在 ObservableFromArray
InnerObserver
ObservableFlatMap
MergeObserver
的方法调用中飙车,可能会感到不适。
我们直接跳过进入从ObservableFromArray
的 subscribeActual()
开始看,这里会有很多的跳转不方便一步步展示,所有相关代码调用顺序和注释都在下面:
// ObservableFromArray.java
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> s) {
// [1]
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
// s : InnerObserver 对象
// array : 我们创建子被观察者时调用 .just() 方法生成的String数组
// [4]
s.onSubscribe(d);
// [9]
if (d.fusionMode) { // true 看到这里结束了 onNext() 所有操作
return;
}
d.run();
}
// [2]
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> actual; // InnerObserver 对象
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed;
// [3]
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}
...
// [7]
@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) { //true
fusionMode = true;
return SYNC;
}
return NONE;
}
}
}
InnerObserver:
// ObservableFlatMap.java
static final class InnerObserver<T, U> extends AtomicReference<Disposable>
implements Observer<U> {
private static final long serialVersionUID = -4606175640614850599L;
final long id;
final MergeObserver<T, U> parent;
volatile boolean done;
volatile SimpleQueue<U> queue;
int fusionMode;
InnerObserver(MergeObserver<T, U> parent, long id) {
this.id = id;
this.parent = parent;
}
// [5]
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this, s)) { // true
if (s instanceof QueueDisposable) { // true
@SuppressWarnings("unchecked")
// s : FromArrayDisposable 对象
QueueDisposable<U> qd = (QueueDisposable<U>) s;
// 获取合并的标记 这里返回同步标记 SYNC
// [6]
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) { // true
fusionMode = m;
queue = qd;
done = true;
// parent: MergeObserver
// 把 MergeObserver 的所有事件都发送完毕
// [8]
parent.drain();
return;
}
if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;
}
}
}
}
...
}
MergeObserver:
// [9]
void drain() {
if (getAndIncrement() == 0) {
drainLoop();
}
}
// [10]
// 这里利用之前创建的具有原子性的内部观察者数组发射子被观察者的所有事件
void drainLoop() {
...
}
从上面的源码主要过程是被观察者
的单次调用onNext()
发射的事件变成一个子被观察者
且将其事件都发射给观察者,然后执行下一个onNext()重新走一遍上述代码或者进入其他回调方法,所以整个过程都在同一个线程中且同步执行的,事件的顺序是有序的。
- 到此我们见证了FlatMap发射有序事件的全过程
无序事件
修改下flatMap()
代码,实现无序事件:
- 在创建
子被观察者
的时候调用subscribeOn()
指定发射事件在新的子线程中进行,或者使用delay()
也可以
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final Integer i = integer;
Observable observable_create = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext(i + "-" + 1 + " Observable: " + Thread.currentThread().getName());
e.onNext(i + "-" + 2 + " Observable: " + Thread.currentThread().getName());
e.onNext(i + "-" + 3 + " Observable: " + Thread.currentThread().getName());
e.onComplete();
}
});
Observable observable_just = Observable.just(
integer + "-" + 1 + " Observable: " + Thread.currentThread().getName(),
integer + "-" + 2 + " Observable: " + Thread.currentThread().getName(),
integer + "-" + 3 + " Observable: " + Thread.currentThread().getName());
return observable_create
.subscribeOn(Schedulers.newThread());
// .delay((int(Math.random()*1000),TimeUnit.MILLISECONDS);
}
})
运行结果:
03-16 10:56:09.255 12781-12799/com.example.myapplication E/RxJava: 1-1 Observable: RxCachedThreadScheduler-1 Observer :RxCachedThreadScheduler-1
03-16 10:56:09.255 12781-12799/com.example.myapplication E/RxJava: 1-2 Observable: RxCachedThreadScheduler-1 Observer :RxCachedThreadScheduler-1
03-16 10:56:09.255 12781-12799/com.example.myapplication E/RxJava: 1-3 Observable: RxCachedThreadScheduler-1 Observer :RxCachedThreadScheduler-1
03-16 10:56:09.275 12781-12800/com.example.myapplication E/RxJava: 2-1 Observable: RxCachedThreadScheduler-2 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 2-2 Observable: RxCachedThreadScheduler-2 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 3-1 Observable: RxCachedThreadScheduler-3 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 3-2 Observable: RxCachedThreadScheduler-3 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 3-3 Observable: RxCachedThreadScheduler-3 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 2-3 Observable: RxCachedThreadScheduler-2 Observer :RxCachedThreadScheduler-2
无序事件的子被观察者
不再由.just()
方法创建,而是.create()
代替,原因是由于ObservableFromArray
实现逻辑不能在日志中直接明确的显示子被观察者
发送事件是在子线程进行的。
observable_just 作为子被观察者
的运行结果:
03-16 11:10:55.381 13418-13437/? E/RxJava: 1-1 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.381 13418-13437/? E/RxJava: 1-2 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.381 13418-13437/? E/RxJava: 1-3 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.391 13418-13437/? E/RxJava: 3-1 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.391 13418-13437/? E/RxJava: 3-2 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.391 13418-13437/? E/RxJava: 3-3 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.401 13418-13438/? E/RxJava: 2-1 Observable: main Observer :RxCachedThreadScheduler-2
03-16 11:10:55.401 13418-13438/? E/RxJava: 2-2 Observable: main Observer :RxCachedThreadScheduler-2
03-16 11:10:55.401 13418-13438/? E/RxJava: 2-3 Observable: main Observer :RxCachedThreadScheduler-2
由于调用just()
的时候已经在当前线程(默认主线程)把事件都准备好了,再在子线程中发射出去,所以日志上打印的是主线程
总结:
-
FlatMap
把每个发射的事件都包装成新的子被观察者
,然后这些子被观察者
再把子事件发送出去 -
无序事件每个
子被观察者
发射的所有事件都运行在同一个线程内发射且顺序按照代码的调用顺序 -
当
子被观察者
都不指定子线程而是在当前线程的时候,flatMap
作用跟concatMap
相同