Rxjava:flatmap源码分析:onComplete没有运

2017-12-31  本文已影响0人  沈杰3

之前在学习Rxjava时,写demo。发现在FlatMap运算中,即使上游发送了onComplete,在下游也无法执行onComplete:


        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
//                        Log.i(TAG, "!!!!onNext 1 before");
//                        e.onNext(1);
//                        Log.i(TAG, "!!!!onNext 1 after");

                        Log.i(TAG, "!!!!onNext 2 before");
                        e.onNext(2);
                        Log.i(TAG, "!!!!onNext 2 after");

                        // 位置1:如果注释以下代码,在位置10出将无法触发
                        Log.i(TAG, "!!!!onComplete before");
                        e.onComplete();
                        Log.i(TAG, "!!!!onComplete after");
                    }
                })
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(final Integer integer) throws Exception {
                        Log.i(TAG, "@@@@flatMap " + integer);
                        Observable<String> ob = Observable
                                .create(new ObservableOnSubscribe<String>() {
                                    @Override
                                    public void subscribe(final ObservableEmitter<String> e) throws Exception {
                                        String value = "####flatMap: Observable2,create: " + integer;
                                        Log.i(TAG,value);
                                        Log.i(TAG, "####flatMap: Observable2, next " + integer + " before");
                                        e.onNext(value);
                                        Log.i(TAG, "####flatMap: Observable2, next " + integer + " after");

                                        if(integer == 2){
                                            // 位置2:如果注释以下代码,在位置10出将无法触发
//                                            Log.i(TAG, "####flatMap: Observable2, onComplete 2 before");
//                                            e.onComplete();
//                                            Log.i(TAG, "####flatMap: Observable2, onComplete  2 after");
                                        }
                                    }
                                });
                        return ob;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.i(TAG, "XXXXsubscribe:" + s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.i(TAG, "TTTTerror");
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        //位置10:触发onComplete .如果注释位置1和位置2中任何一处,都将无法触发这里
                        Log.i(TAG, "VVVVcomplete");
                    }
                }
                );

Log.i(TAG, "VVVVcomplete"); 没有执行

为什么???

分析Flatmap运算符源码,主要是ObservableFlatMap 这个类:

关键类:MergeObserver,InnerObserver
关键方法:mergeObserver.onNext(),merge.onComplete(),mergeObserver.drain(),merge.insertInnser(),merge.removeInner();
innerObserver.onComplete(),

mergeObserver.drain()关键代码:
innserObserver.onComplete(),onError()
QQ图片20171230235254.png
上一篇 下一篇

猜你喜欢

热点阅读