Rxjava源码解析--flatMap源码解析
基于rxjava1.1.0
用例代码↓
Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
});
Subscriber<String> subscriber1 = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("haha",s);
}
};
observable1.flatMap(new Func1<String, Observable<String>>() {
@Override
⑬
public Observable<String> call(String s) {
return Observable.just(s+"23");
}
}).subscribe(subscriber1);
flatMap源码精简版↓
①
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
return merge(map(func));
}
Map源码↓
②
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
OperatorMap精简版↓
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
⑩
//create subscriber2 传入subscriber3 = o
return new Subscriber<T>(o) {
@Override
public void onNext(T t) {
⑫
o.onNext(transformer.call(t));
}
};
}
}
map lift精简源码↓
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
③
//create Observable2 OnSubscribe2
return new Observable<R>(new OnSubscribe<R>() {
⑨
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
⑪
onSubscribe.call(st);//onSubscribe1.call(subscriber2)
}
});
}
merge lift精简源码↓
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
⑤
//create Observable3 OnSubscribe3
return new Observable<R>(new OnSubscribe<R>() {
⑥
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
⑧
onSubscribe.call(st);//onSubscribe2.call(subscriber3)
}
});
}
merge精简版源码↓
④
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
return source.lift(OperatorMerge.<T>instance(false));//source = observable2
}
OperatorMerge代码片段↓
@Override
⑦
//create subscriber3 传入subscriber1 = child
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
MergeProducer<T> producer = new MergeProducer<T>(subscriber);
subscriber.producer = producer;
return subscriber;
}
@Override
public void onNext(Observable<? extends T> t) {
if (t == null) {
return;
}
if (t instanceof ScalarSynchronousObservable) {
⑭
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
} else {
InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner);
emit();
}
}
OperatorMerge代码精简片段↓
void tryEmit(T value) {
emitScalar(value, r);
}
OperatorMerge代码精简片段↓
protected void emitScalar(T value, long r) {
boolean skipFinal = false;
⑮
child.onNext(value);
}
代码调用流程由①到最后
代码分解
observable1.flatMap(func).subscirbe(subcriber1)=
observable1.merge(map(func)).subscirbe(subcriber1)=
observable1.merge(observable2).subscirbe(subcriber1)=
observable3.subscirbe(subcriber1)
由上述代码分解可以知道执行observable1.flatMap(func).subscirbe(subcriber1)时map的lift先去创建observable2 onSubscribe2到这里时已经是①→②→③
*//重点
继续执行到④发现merge的调用是source.lift(),这里的source即是observable2 即observable1调用map的lift创建observable2 onSubscribe2, 到节点⑤observable2 调用merge的lift创建observable3 onSubscribe3,所以map lift 中有onSubscribe1的引用 , merge lift 中有onSubscribe2的引用
此时订阅关系变为observable3.subscirbe(subcriber1) = observable3.onSubscribe3.call(subcriber1)即执行⑥到达⑦创建subscriber3 ,继续执行到达⑧执行onSubscribe2.call(subscriber3)到达⑨执行call方法到达⑩创建subscriber2 并传入subscriber3继续执行到达⑪等价于onSubscribe1.call(subscriber2)
继续执行onSubscribe1.call()开始发射数据,subscriber2.onNext("1"),到达⑫,其中transformer.call(t)调用的是⑬生成一个ScalarSynchronousObservable类型的直接发射数据的observable4<String> 并把发射的数据缓存在 ScalarSynchronousObservable类中,继续执行subscriber3.onNext(observable4)到达⑭通过ScalarSynchronousObservable中的方法把observable4中的数据从缓存中取出来赋值给value最后到达⑮执行child.onNext(value);这里child = subscriber1 即subscriber1.onNext("123")
至此流程完结