Rxjava源码解析--flatMap源码解析

2017-11-19  本文已影响91人  Rogge666

基于rxjava1.1.0

用例代码↓
        Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("1");
                subscriber.onCompleted();
            }
        });

        Subscriber<String> subscriber1 = new Subscriber<String>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(String s) {
                Log.e("haha",s);
            }
        };

        observable1.flatMap(new Func1<String, Observable<String>>() {
            @Override
             ⑬
            public Observable<String> call(String s) {
                return Observable.just(s+"23");
            }
        }).subscribe(subscriber1);
flatMap源码精简版↓
①
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        return merge(map(func));
    }

Map源码↓
②
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }
OperatorMap精简版↓
public final class OperatorMap<T, R> implements Operator<R, T> {
    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }
  
    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        ⑩
        //create subscriber2 传入subscriber3 = o
        return new Subscriber<T>(o) {
            @Override
            public void onNext(T t) {
                ⑫
                o.onNext(transformer.call(t));
            }
        };
    }
}
map lift精简源码↓
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        ③
        //create Observable2  OnSubscribe2
        return new Observable<R>(new OnSubscribe<R>() {
            ⑨
            @Override
            public void call(Subscriber<? super R> o) {
                
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                st.onStart();
                ⑪
                onSubscribe.call(st);//onSubscribe1.call(subscriber2)
            }
        });
    }
merge lift精简源码↓
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        ⑤
        //create Observable3  OnSubscribe3
        return new Observable<R>(new OnSubscribe<R>() {
            ⑥
            @Override
            public void call(Subscriber<? super R> o) {
                
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                st.onStart();
                ⑧
                onSubscribe.call(st);//onSubscribe2.call(subscriber3)
            }
        });
    }
merge精简版源码↓
④
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        return source.lift(OperatorMerge.<T>instance(false));//source = observable2 
    }
OperatorMerge代码片段↓
 @Override
    ⑦
    //create subscriber3 传入subscriber1  = child
    public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
        MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
        MergeProducer<T> producer = new MergeProducer<T>(subscriber);
        subscriber.producer = producer;
        return subscriber;
    }

        @Override
        public void onNext(Observable<? extends T> t) {
            if (t == null) {
                return;
            }
            if (t instanceof ScalarSynchronousObservable) {
                ⑭
                tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
            } else {
                InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
                addInner(inner);
                t.unsafeSubscribe(inner);
                emit();
            }
        }

OperatorMerge代码精简片段↓
    void tryEmit(T value) {
        emitScalar(value, r);
    }

OperatorMerge代码精简片段↓
    protected void emitScalar(T value, long r) {
        boolean skipFinal = false;
        ⑮
        child.onNext(value);
    }

代码调用流程由①到最后
代码分解
observable1.flatMap(func).subscirbe(subcriber1)=
observable1.merge(map(func)).subscirbe(subcriber1)=
observable1.merge(observable2).subscirbe(subcriber1)=
observable3.subscirbe(subcriber1)

由上述代码分解可以知道执行observable1.flatMap(func).subscirbe(subcriber1)时map的lift先去创建observable2 onSubscribe2到这里时已经是①→②→③

*//重点
继续执行到④发现merge的调用是source.lift(),这里的source即是observable2 即observable1调用map的lift创建observable2 onSubscribe2, 到节点⑤observable2 调用merge的lift创建observable3 onSubscribe3,所以map lift 中有onSubscribe1的引用 , merge lift 中有onSubscribe2的引用

此时订阅关系变为observable3.subscirbe(subcriber1) = observable3.onSubscribe3.call(subcriber1)即执行⑥到达⑦创建subscriber3 ,继续执行到达⑧执行onSubscribe2.call(subscriber3)到达⑨执行call方法到达⑩创建subscriber2 并传入subscriber3继续执行到达⑪等价于onSubscribe1.call(subscriber2)

继续执行onSubscribe1.call()开始发射数据,subscriber2.onNext("1"),到达⑫,其中transformer.call(t)调用的是⑬生成一个ScalarSynchronousObservable类型的直接发射数据的observable4<String> 并把发射的数据缓存在 ScalarSynchronousObservable类中,继续执行subscriber3.onNext(observable4)到达⑭通过ScalarSynchronousObservable中的方法把observable4中的数据从缓存中取出来赋值给value最后到达⑮执行child.onNext(value);这里child = subscriber1 即subscriber1.onNext("123")

至此流程完结

上一篇下一篇

猜你喜欢

热点阅读