RxJava 2.0(二)线程调度Scheduler和操作符

2017-05-30  本文已影响0人  Cris_Ma

RxJava中的线程

默认的情况下,Observable 和 Observer是处在同一线程的,发送事件在哪个线程,处理事件同样也在该线程。
在Activity的onCreate方法中运行以下代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d("RxJava", "Observable Thread:  " + Thread.currentThread().getName());
                e.onNext(1);
            }
        })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
                    }
                });

可以得到以下结果:

D/RxJava: Observable Thread: main
D/RxJava: Observer Thread: main

RxJava中,使用Scheduler来进行线程控制,从而实现了关键的异步操作。

Scheduler

Scheduler可以称之为线程调度器,它指定了发送和处理事件所在的线程。常用的API有以下几个:

介绍完了常用API之后,通过下面的例子来看一下是怎样使用的:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d("RxJava", "Observable Thread:  " + Thread.currentThread().getName());
                e.onNext(1);
            }
        })
                .subscribeOn(Schedulers.newThread())//指定observable线程
                .observeOn(AndroidSchedulers.mainThread())//指定Observer线程
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
                    }
                });

还是上面的例子,加了两行代码,subscribeOn和observeOn。subscribeOn用来指定发送事件的线程,即事件产生的线程,observeOn指定接收并处理事件的线程,即事件消费线程。运行结果如下:

D/RxJava: Observable Thread: RxNewThreadScheduler-1
D/RxJava: Observer Thread: main

subscribeOn和observeOn都可以多次设置,但是subscribeOn只有第一次设置的值会生效,而observeOn不一样,观察者会按照observeOn的指定顺序依次切换到最后一个线程。

操作符

操作符的作用是在事件发送的过程中完成一些特定的操作,比如对事件的包装,添加额外的动作等等。常用操作符主要有以下几种:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        })
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(@NonNull Integer integer) throws Exception {
                        return "This is Data No. " + integer ;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String str) throws Exception {
                        Log.d("RxJava", "Received data: " + str);
                    }
                });

输出结果如下:

D/RxJava: Received data: This is Data No. 1
D/RxJava: Received data: This is Data No. 2
D/RxJava: Received data: This is Data No. 3

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
//原始事件,打印线程
                Log.d("RxJava", "Original Observable Thread:  " + Thread.currentThread().getName());
                e.onNext(10);
                e.onNext(20);
                e.onNext(30);
            }
        })
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull final Integer integer) throws Exception {
                        return Observable.create(new ObservableOnSubscribe<String>(){

                            @Override
                            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
//打印FlatMap转换后,发送事件的线程
                                Log.d("RxJava", "Observable Thread:  " + Thread.currentThread().getName());
                                Thread.sleep(1000);
                                e.onNext("This is Data No." + integer);
                            }
//指定flatMap转换后发送事件所处的线程
                        }).subscribeOn(Schedulers.io());
                    }
                })
//指定原始事件发送线程
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String str) throws Exception {
//打印观察者所处的线程
                        Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
                        Log.d("RxJava", "Received data: " + str);
                    }
                });

运行结果如下:

Original Observable Thread: RxCachedThreadScheduler-1
D/RxJava: Observable Thread: RxCachedThreadScheduler-1
D/RxJava: Observable Thread: RxCachedThreadScheduler-2
D/RxJava: Observable Thread: RxCachedThreadScheduler-3
Observer Thread: main
D/RxJava: Received data: This is Data No.10
D/RxJava: Observer Thread: main
D/RxJava: Received data: This is Data No.20
D/RxJava: Observer Thread: main
D/RxJava: Received data: This is Data No.30

从结果可以看出来,最初的Observable包含3个事件,运行在同一个子线程中,如果是耗时操作,采用同步的方式会浪费大量事件,经过FlatMap转换之后,将每个事件转换为一个新的Observable对象,并指定线程,效率一下提高了3倍!

.filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                        return false;
                    }
                })

返回值决定了下游观察者是否能够收到数据,true表示能收到,false表示不能接收到。

.observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("RxJava", "onnext2 Observer Thread: " + Thread.currentThread().getName());
                    }
                })
上一篇 下一篇

猜你喜欢

热点阅读