RxAndroid开发经验谈Android开发

RxJava 知识梳理(3) - RxJava2 基础知识小结

2017-08-17  本文已影响422人  泽毛

前言

首先要感谢 Season_zlc 的一系列RxJava2的教程,关于上游、下游、水缸的类比,让我对于整个RxJava2的基本思想有了更加清晰的认识。大家有兴趣的话一定要多看看,写的通俗易懂,传送门:给初学者的 RxJava 2.0 教程 (一) ,本文的思想都来源于它的一系列文章。

文章比较长,为了避免耽误大家的时间,先列出需要介绍的知识点:


一、RxJava2 的基本模型

1.1 使用实例

在开始学习之前,我们先看一下最简单的例子:

dependencies {
    //在build.gradle中,导入依赖。
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
    public static void classicalSample() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                observableEmitter.onNext("1");
                observableEmitter.onNext("2");
                observableEmitter.onNext("3");
                observableEmitter.onNext("4");
                observableEmitter.onComplete();

            }
        }).subscribe(new Observer<String>() {

            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "onSubscribe");
                mDisposable = disposable;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext=" + s);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");

            }
        });
    }

1.2 基本元素

在上面的例子中,涉及到了以下五个类:

对于整个模型,可以总结为以下几点:

各关键元素的UML图如下:

1.3 ObservableEmitter

用于 发出事件,它可以分别发出onNext/onComplete/onError事件:

其继承关系如下图所示:


1.4 Disposable

理解成为 水管的机关,当调用它的dispose方法时,将会将上游和下游之间的管道切断,从而导致 下游接收不到事件

我们来模拟一下,在下游收到2之后,通过Disposable来切断上游和下游之间的联系:

    public static void classicalSample() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                observableEmitter.onNext("1");
                observableEmitter.onNext("2");
                observableEmitter.onNext("3");
                observableEmitter.onNext("4");
                observableEmitter.onComplete();

            }
        }).subscribe(new Observer<String>() {

            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "onSubscribe");
                mDisposable = disposable;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext=" + s);
                if ("2".equals(s)) {
                    mDisposable.dispose();
                }
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");

            }
        });
    }

最终的运行结果为:


1.5 Subscribe 的重载方法

通过subscribe确定上游和下游的联系有以下几种方法:


可以看到,这里可以分为三类:

对于不使用Observer类作为形参的subscribe函数,其实实现的功能和使用Observer类作为参数的方法相同,只不过它们是将Observer的四个回调分解成形参,有参数的回调用Consumer<T>代替,而没有参数的则用Action代替。

二、线程切换

2.1 基本概念

2.2 线程类型

2.3 示例

在链式调用当中,我们可以通过observeOn方法多次切换管道下游处理消息的线程,例如下面的代码,我们对下游进行了两次线程的切换:

    static void mapSample() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=true");
                observableEmitter.onNext("true");
                Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=false");
                observableEmitter.onNext("false");
                Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",onComplete");
                observableEmitter.onComplete();
            }
        //1.指定了subscribe方法执行的线程,并进行第一次下游线程的切换,将其切换到新的子线程。   
        }).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).map(new Function<String, Boolean>() {

            @Override
            public Boolean apply(String s) throws Exception {
                Log.d(TAG, "apply's thread=" + Thread.currentThread().getId() + ",s=" + s);
                return "true".equals(s);
            }
        //2.进行第二次下游线程的切换,将其切换到主线程。    
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Boolean>() {

            @Override
            public void onSubscribe(Disposable disposable) {

            }

            @Override
            public void onNext(Boolean aBoolean) {
                Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",boolean=" + aBoolean);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",onComplete");
            }
        });
    }

以上代码的运行的结果为:


三、Map 和 FlatMap 操作符

3.1 Map

    public static void mapVerify() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
            }
        });
        Observable<String> convertObservable = sourceObservable.map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer.toString();
            }
        });
        Log.d(TAG, "sourceObservable=" + sourceObservable + "\n convertObservable=" + convertObservable);
    }

Function为一个接口:


并且在map函数调用完毕之后,将返回一个新的Observable,它的类型为ObservableMap

3.2 FlatMap

3.2.1 示例

    static void flatMapSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                observableEmitter.onNext(1);
                observableEmitter.onNext(2);
                observableEmitter.onNext(3);
            }
        });
        Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                return Observable.fromArray("a value of " + integer + ",b value of " + integer);
            }
        });
        flatObservable.subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }

map操作符类似,它也接收一个类型为Function的接口,只不过它的? extends R参数类型换成了? extends Observable<? extends R>

3.2.2 FlatMap 不保证下游接收事件的顺序

前面我们说到,flatMap操作符不会保证下游接收事件的顺序,下面,我们就以一个例子来说明,在flatMapapply函数中,我们将一个事件转换成两个Observable,并且加上了延时:

    static void flatMapOrderSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "flatMapOrderSample emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "flatMapOrderSample emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "flatMapOrderSample emit 3");
                observableEmitter.onNext(3);
            }
        });
        Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                Log.d(TAG, "flatMapOrderSample apply=" + integer);
                long delay = (3 - integer) * 100;
                return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
            }
        });
        flatObservable.subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }

可以看到,最终的输出结果和flatMap收到事件的顺序并不相同:


下面,还是同样的场景,将flatMap换成contactMap
    static void contactMapOrderSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "contactMapOrderSample emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "contactMapOrderSample emit 1");
                observableEmitter.onNext(2);
                Log.d(TAG, "contactMapOrderSample emit 1");
                observableEmitter.onNext(3);
            }
        });
        Observable<String> flatObservable = sourceObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).concatMap(new Function<Integer, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                Log.d(TAG, "contactMapOrderSample apply=" + integer);
                long delay = (3 - integer) * 100;
                return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
            }
        });
        flatObservable.subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }

最终的运行结果为:


四、Zip 操作符

4.1 基本概念

4.1.1 两个 Observable 运行在同一线程当中

    static void zipSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "sourceObservable emit 1");
                observableEmitter.onNext(1);
                Thread.sleep(1000);
                Log.d(TAG, "sourceObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "sourceObservable emit 3");
                observableEmitter.onNext(3);
                Log.d(TAG, "sourceObservable emit 4");
                observableEmitter.onNext(4);
            }
        });
        Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "otherObservable emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "otherObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "otherObservable emit 3");
                observableEmitter.onNext(3);
            }
        });
        Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {

            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }

        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "resultObservable onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "resultObservable onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "resultObservable onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "resultObservable onComplete");

            }
        });
    }

此时的运行结果为:


4.1.2 两个 Observable 运行在不同的线程

    static void zipSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "sourceObservable emit 1");
                observableEmitter.onNext(1);
                Thread.sleep(1000);
                Log.d(TAG, "sourceObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "sourceObservable emit 3");
                observableEmitter.onNext(3);
                Log.d(TAG, "sourceObservable emit 4");
                observableEmitter.onNext(4);
            }
        });
        Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "otherObservable emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "otherObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "otherObservable emit 3");
                observableEmitter.onNext(3);
            }
        }).subscribeOn(Schedulers.io());
        Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {

            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }

        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "resultObservable onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "resultObservable onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "resultObservable onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "resultObservable onComplete");

            }
        });
    }

运行结果为:


五、背压

“背压”其实就是一种用于解决问题的工具,那么我们的问题又是什么呢?

想必大家在很多文章中都听过这个一句话:在RxJava2中,Observable不支持“背压”,而Flowable支持背压。

5.1 不支持背压的 Observable

关于Observable不支持背压,我们应当从两种情况去考虑,即上游、下游是否位于相同的线程。

5.1.1 Observable 之上游、下游位于相同线程

首先,我们不调用observeOnsubscribeOn方法来改变上游、下游的工作线程,这样,上游和下游就位于同一线程,同时,我们在下游的处理函数中,每收到一个消息就休眠2000ms,以模拟上游处理速度大于下游的场景。

    static void oomSample() {
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    Log.d(TAG, "observableEmitter=" + i);
                    observableEmitter.onNext(i);
                }
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Thread.sleep(2000);
                Log.d(TAG, "accept=" + integer);
            }

        });
    }

从下面的打印结果可以看到,当“使用 Observable,并且上游、下游位于相同线程”时,并不会出现消息堆积的情况,因为上游发射完一条消息后,必须要等到下游处理完该消息,才会发射一条新的消息。

5.1.2 Observable 之上游、下游位于不同线程

接着,我们采用subscribeOnobserveOn来使得上游和下游位于不同的工作线程,其它均和2.2中相同。

    static void oomSample() {
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    Log.d(TAG, "observableEmitter=" + i);
                    observableEmitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Thread.sleep(2000);
                Log.d(TAG, "accept=" + integer);
            }

        });
    }

2.2中不同,当上游和下游位于不同的工作线程,那么上游发送消息时,不会考虑下游是否已经处理了之前的消息,它会直接发送,而这些发送的消息被存放在水缸当中,下游每处理完一条消息,就去水缸中取下一条数据,那么随着水缸中数据越来越多,那么系统中的无用资源就会急剧增加。

5.1.3 关于 Observable 不支持背压的小结

我们之所以说Observable不支持“背压”,就是在2.1介绍的整个族谱中,没有一个类,一种方法能让下游通知上游说:不要再发消息到水缸里了,我已经处理不过来了!

那是不是说Flowable支持“背压”,而Observable不支持,那么Observable就要被取代了呢,其实不然,Flowable对于“背压”的支持是以性能为代价的,我们应当只在有可能出现2.3中上游下游速率不匹配的问题时,才去使用Flowable,否则就应当使用Observable,也就是满足两点条件:

5.2 支持背压的 Flowable

5.2.1 基本概念

5.2.2 基本使用

    static void flowSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }

        }, BackpressureStrategy.ERROR);

        sourceFlow.subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

其类结构图和Observable几乎完全一致:

5.3 Flowable 支持背压的策略

从上面的类图可以看出,FlowableObservable最大的不同,就是在create方法中,需要传入额外的参数,它表示的是“背压”的策略,这里可选的值包括:

5.3.1 使用 ERROR 的策略

下面这段代码,我们先将三个事件放入到水缸当中,之后每次调用request方法就会从水缸当中取出一个事件发送给下游。

   static void flowSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }

        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

    static void clickSubscription() {
        if (sSubscription != null) {
            sSubscription.request(1);
        }
    }

当上游和下游位于不同的线程,每次通过Subscription调用request就会从水缸中取出一个事件,发送给下游:

5.3.2 BUFFER 策略

    static void clickSubscription() {
        if (sSubscription != null) {
            sSubscription.request(10);
        }
    }

    static void flowBufferSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 10000;i ++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }

        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

在上面的例子中,我们先把10000条消息放入到水缸当中,之后通过Subscription每次从水缸中取出10条消息发送给下游,演示结果为:

5.3.3 DROP 策略

    static void flowDropSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 130; i++) {
                    emitter.onNext(i);
                }
            }

        }, BackpressureStrategy.DROP).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

我们先往水缸中放入130条消息,之后每次通过Subscription取出60条消息发送给下游,可以看到,最后最多只取到了第128条消息,第129/130条消息被丢弃了。

5.3.4 LATEST 策略

    static void flowLatestSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 130; i++) {
                    emitter.onNext(i);
                }
            }

        }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

从下面的运行结果可以看出,当取出最后一批数据的时候,上游除了收到存储在水缸当中的数据,还额外收到了最后一条消息,也就是第130条数据,这就是DROP策略和LATEST策略的区别:


更多文章,欢迎访问我的 Android 知识梳理系列:

上一篇 下一篇

猜你喜欢

热点阅读