RxJava2.x 分析源码,理解操作符FlatMap
需求很简单
获取手机存储卡中的所有文件.
代码如下
File file = new File(Environment.getExternalStorageDirectory().getPath());
//定义一个被观察者
Observable<File> observable = Observable.just(file)
.flatMap(new Function<File, ObservableSource<File>>() {
@Override
public ObservableSource<File> apply(@NonNull File file) throws Exception {
return listFile(file);
}
});
//定义一个观察者
Observer<File> observer = new Observer<File>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(File file) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//订阅
observable.subscribe(observer);
}
//递归文件,
private Observable<File> listFile(File file) {
if (file.isDirectory()) {
Observable<File> ob = Observable.fromArray(file.listFiles())
.flatMap(new Function<File, ObservableSource<File>>() {
@Override
public ObservableSource<File> apply(@NonNull File file) throws Exception {
return listFile(file);
}
});
return ob;
} else {
Log.d("cql", file.getName());
return Observable.just(file);
}
}
到了这里,如果你已经理解了这段代码的意思,那就没必要往下看了,但是如果你看了之后感觉不是很懂,或者模棱两可,那么可以试着往下看。
这段代码其实是之前看视频教程,里面老师举的例子,老师快速的解释了什么是FlatMap,然后又快速的写下了上面的代码,几个下一步,就讲解完了。我在旁边听的两脸懵逼,老师水平很高,但是我悟性太差。实在理解不了,没什么办法,只能自己分析。OK,不多BB下面直接开始。
什么是FlatMap
大家可以简单的看一下官方的介绍。(虽然我看不太懂)
Flatmap
直接分析
我们先看看在代码(被观察者订阅观察者之后)执行下面代码后的操作.
observable.subscribe(observer);
直接看subscribe代码
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//和hook有关暂且放一下
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//关键代码是这一句
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
然后定位到这句
subscribeActual(observer);
可以看到,这个方法是个抽象方法
protected abstract void subscribeActual(Observer<? super T> observer);
OK,那么我们直接找他的实现类。回到前面,我们发现这个被观察者的实现类实际上是由下面代码返回的,那么很简单,我们直接确定这个observable的类型就可以了。
Observable<File> observable = Observable.just(file)
.flatMap(new Function<File, ObservableSource<File>>() {
@Override
public ObservableSource<File> apply(@NonNull File file) throws Exception {
return listFile(file);
}
});
经过调试大法,我们发现了这个实现类为ObservableScalarXMap。先不用管他是什么,直接进去看subscribeActual
public void subscribeActual(Observer<? super R> s) {
ObservableSource<? extends R> other;
try {
//关键代码
other = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
EmptyDisposable.error(e, s);
return;
}
if (other instanceof Callable) {
R u;
try {
u = ((Callable<R>)other).call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, s);
return;
}
if (u == null) {
EmptyDisposable.complete(s);
return;
}
ScalarDisposable<R> sd = new ScalarDisposable<R>(s, u);
s.onSubscribe(sd);
sd.run();
} else {
other.subscribe(s);
}
}
看上面注释为关键代码的那一段
ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null ObservableSource");
可以看到调用了mapper.apply(value),那么mapper是神马呢?我们来看构造方法
ScalarXMapObservable(T value,
Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
this.value = value;
this.mapper = mapper;
}
OK,mapper原来就是我们传进来的function,所以也是直接执行了function的回调方法apply.
到此为止,我们知道了订阅之后代码执行流程是如何执行到apply里面去的。
那么现在,我们继续分析,先看一下listFile里面做了啥。
private Observable<File> listFile(File file) {
if (file.isDirectory()) {
Observable<File> ob = Observable.fromArray(file.listFiles())
.flatMap(new Function<File, ObservableSource<File>>() {
@Override
public ObservableSource<File> apply(@NonNull File file) throws Exception {
return listFile(file);
}
});
return ob;
} else {
//为文件的时候
Log.d("cql", file.getName());
return Observable.just(file);
}
}
可以看到上面代码判断当file不是个目录的时候,返回Observable.just(file)。那为什么要返回这个?
OK 继续回头看一下之前提到ObservableScalarXMap的subscribeActual方法(apply的实现的地方),为了不让大家回去看,我又粘贴了一份代码,大家继续向下看就可以了。
public void subscribeActual(Observer<? super R> s) {
ObservableSource<? extends R> other;
try {
//关键代码
other = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
EmptyDisposable.error(e, s);
return;
}
if (other instanceof Callable) {
R u;
try {
u = ((Callable<R>)other).call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, s);
return;
}
if (u == null) {
EmptyDisposable.complete(s);
return;
}
ScalarDisposable<R> sd = new ScalarDisposable<R>(s, u);
s.onSubscribe(sd);
sd.run();
} else {
other.subscribe(s);
}
}
}
关键代码还是这句
other =ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null ObservableSource");
other是apply执行的返回结果也就是Observable.just(file) 。
然后判断了Observable.just(file) 是否为多态Callable
if (other instanceof Callable) {
R u;
try {
u = ((Callable<R>)other).call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, s);
return;
}
if (u == null) {
EmptyDisposable.complete(s);
return;
}
ScalarDisposable<R> sd = new ScalarDisposable<R>(s, u);
s.onSubscribe(sd);
sd.run();
} else {
other.subscribe(s);
}
我们先看看他是不是,继续调试大法发现Observable.just(file)返回值ObservableJust,然后看一下源码
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T>
public interface ScalarCallable<T> extends Callable<T>
OK是Callable的子类,那继续看,其实后面代码已经很明朗了,我们大概扫一下就可以,
ScalarDisposable<R> sd = new ScalarDisposable<R>(s, u);
s.onSubscribe(sd);
sd.run();
然后sd.run方法
if (get() == START && compareAndSet(START, ON_NEXT)) {
//关键代码
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
OK,看到了调用了onNext,将最终结果返回给了观察者。呦西 原来是这样!如果是文件的话,就直接用just操作符发送给观察者去处理结果。
一种情况我们弄明白了,现在看另一种情况
if (file.isDirectory()) {
Observable<File> ob = Observable.fromArray(file.listFiles())
.flatMap(new Function<File, ObservableSource<File>>() {
@Override
public ObservableSource<File> apply(@NonNull File file) throws Exception {
return listFile(file);
}
});
return ob;
}
上面代码现在看很明朗,但是我当时看非常懵逼。所以简单分析一下。
和刚才一样直接分析apply方法的返回值即可。继续调试大法,看到返回值为ObservableFlatMap,然后他和Callable并没有什么关系,所以看下面代码,他应该是走到了else后面other.subscribe(s);
if (other instanceof Callable) {
R u;
try {
u = ((Callable<R>)other).call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, s);
return;
}
if (u == null) {
EmptyDisposable.complete(s);
return;
}
ScalarDisposable<R> sd = new ScalarDisposable<R>(s, u);
s.onSubscribe(sd);
sd.run();
} else {
other.subscribe(s);
}
OK,继续分析other.subscribe(s)的方法体
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
OK又看到了熟悉的subscribe方法了,到这里就可以不需要继续分析下去了,虽然看到了官网文档介绍的MergeObserver。我们看到subscribe方法 就应该知道 下面的流程和之前分析的相似,会继续调用被观察者的apply方法,形成递归。
根据这个例子,简单总结
当使用变换操作符FlatMap的时候,如果返回的Observable是Callable的子类的时候,直接会将数据发射给观察者去处理,如果返回的Observable并非Callable的子类,仍然会继续调用apply方法去处理发射的数据源,直到他为Callable,然后给观察者去处理。从这里可以看出FlatMap操作符很有用!
当然这只是个例子,也只是FlatMap基本的用法,更深入的功能,还要继续深入理解每一句代码的意思。
以上纯属个人观点,帮助像我一样悟性差的人去理解rxjava,有问题的话,你来打我啊,哈哈哈,就到这里了。有问题欢迎指正。