RxJava

我学 rxjava 2(4)- subscribeOn/obse

2018-04-21  本文已影响248人  前行的乌龟

rxjava 的东西是很多的,难免有理解错误的地方,这两天面试碰到有人问 subscribeOn/observeOn 线程切换的问题,我回答完,面试官明显不满意,回来找了找资料,还真是自己理解错了,有必要专门写一篇文章出来。

例子1 : subscribeOn/observeOn 最简单使用


        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d("AA", "数据源" + Thread.currentThread().getName());
                emitter.onNext("");
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("AA", "监听者" + Thread.currentThread().getName());
                    }
                });
Snip20180421_12.png

按照之前的理解:

我们看这个最简单的例子,的确是这样,那么更复杂的情况呢。

例子2:subscribeOn/observeOn 连着重复写,哪个为准


还是以上面那个最简单的例子来

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d("AA", "数据源" + Thread.currentThread().getName());
                emitter.onNext("");
            }
        })
                .subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("AA", "监听者" + Thread.currentThread().getName());
                    }
                });
Snip20180421_13.png

从结果来看:

例子3 :添加多个操作符呢


rxjava 中的操作符基本都会生成一个新的 observable 出来,上下游的关系就复杂了,情况会不会有变化呢,这个例子就复杂了

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d("AA", "数据源" + Thread.currentThread().getName());
                emitter.onNext("");
            }
        })
                .subscribeOn(Schedulers.computation())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第1次变化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第2次变化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第3次变化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第4次变化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .observeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("AA", "监听者" + Thread.currentThread().getName());
                    }
                });
Snip20180421_17.png

从结果看:

例子4:用 observeOn 给多个操作符切换线程


        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d("AA", "数据源" + Thread.currentThread().getName());
                emitter.onNext("");
            }
        })
                .subscribeOn(Schedulers.computation())
                .observeOn( Schedulers.io() )
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第1次变化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第2次变化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .observeOn(Schedulers.computation())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第3次变化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第4次变化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("AA", "监听者" + Thread.currentThread().getName());
                    }
                });
Snip20180421_19.png

从结果看:

好了来说说原理


因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。 ——扔物线

摘自:拥抱RxJava(番外篇):关于RxJava的Tips & Tricks ,推荐大家去看看原文

我们翻翻源码呢,看看能不能简单的走一下逻辑


1.png 2.png 3.png

不是很好理解,但是大概应该是这个意思

换个更容易理解的描述:

参考资料:


上一篇下一篇

猜你喜欢

热点阅读