Android开发积累

RxJava<第十三篇>:线程控制(切换/调度)

2019-03-21  本文已影响35人  NoBugException

RxJava的线程控制主要设计到两种操作符:subscribeOnobserveOn

subscribeOn:如果多次调用,则只有第一次调用有效;
observeOn:如果多次调用,每次有可以切换线程。

(1)默认情况下
    Observable.just("A")
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "threadName:"+Thread.currentThread().getName());
                }
            });

打印日志:

图片.png

默认情况下被观察者和观察者是运行在主线程的,如果阻塞50秒(耗时操作)

    Observable.just("A")
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Thread.sleep(50000);
                    Log.d("aaa", "threadName:"+Thread.currentThread().getName());
                }
            });

这样会阻塞主线程。

这时,我们就需要用到线程控制的知识了。

(2)Scheduler的种类
(3)subscribeOn的使用
    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .subscribeOn(Schedulers.io())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

代码中添加了subscribeOn(Schedulers.io())这句代码,这样就可以从默认主线程切换到IO线程。

我们看一下打印结果

图片.png

所以, 如果单纯用subscribeOn来控制线程,那么被观察者和观察者都会被切换到指定的线程。

如果添加多个, 比如

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .subscribeOn(Schedulers.io())
            .subscribeOn(Schedulers.trampoline())
            .subscribeOn(Schedulers.newThread())
            .subscribeOn(Schedulers.computation())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

那么只有第一次调用subscribeOn有效果。

(4)observeOn的使用
    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .observeOn(Schedulers.newThread())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

打印效果

图片.png

我们发现被观察者在主线程运行,观察者在子线程运行。

结论:结合(3)总结的结论是,subscribeOn可以控制被观察者和观察者的线程,observeOn仅可以控制观察者的线程。

(5)subscribeOn和observeOn结合使用
    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(Schedulers.computation())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

打印效果如下:

图片.png

这样观察者就从主线程切换到子线程了。

我们再来举一个稍微复杂的例子。

   Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .subscribeOn(AndroidSchedulers.mainThread())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map1----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.computation())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map2----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.newThread())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map3----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.single())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map4----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.io())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map1----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.computation())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map2----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.newThread())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map3----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.single())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map4----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.io())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

执行效果如下:

图片.png

我们发现

Schedulers.from()和AndroidSchedulers.mainThread()就不介绍了。

上一篇 下一篇

猜你喜欢

热点阅读