Android-RxJavaAndroid-Rxjava&retrofit&daggerAndroid开发干货集中营

玩转RxJava2.x 学习教程(一)

2017-06-25  本文已影响3673人  依然范特稀西
封面.png

从16年11月份推出RxJava 2.0 ,到现在差不多大半年的时间里,RxJava已经来到了2.x时代,RxJava 1.x 可能也慢慢地被2.x 代替。RxJava 2.x在Reactive-Streams规范的基础上从头开始完全重写,虽然操作符基本没有发生变化,但是因为Reactive-Streams具有不同的架构,因此对一些众所周知的RxJava类型进行了更改。

RxJava 2.x相对与1.x还是有很多的不同,RxJava的类型更改了,很多类的命名和方法的命名发生了变化(可能功能与1.x相同),要想从RxJava 1.x 顺利地过渡到2.x, 就得了解这些变化。因此本教程就带你了解这些变化,从而能更快地上手RxJava2.x。

本篇文章就来先了解一下RxJava 2.x的5种响应类型。


一、RxJava 2.x中5种类型介绍

1 . Observable and Flowable

关于在RxJava 0.x版本引入背压的一个小的遗憾是,没有设计一个单独的基础反应类,而是对Observable本身进行了改装。背压的主要问题在于热源(如:UI事件),不能合理地反压并导致不可预料的异常MissingBackpressureException,这是初学者不期望看到的。

在RxJava 2.x版本中修复了这种情况,将o.reactivex.Observable作为非背压,引入新的io.reactivex.Flowable作为启用背压基础反应类。

好消息是,在2.x版本中,主要的操作符保持不变(同1.x版本),坏消息是,在导入的时候应当小心,它可能会无意的选择非背压的o.reactivex.Observable.

我们应该选哪种?

当构建数据流(作为RxJava的最终消费者)或考虑您的2.x兼容库应该采取和返回什么类型时,您可以考虑几个因素,以帮助您避免诸如MissingBackpressureException或OutOfMemoryError之类的问题。

Observable使用场景:

Flowable使用场景:

2 . Single 使用介绍

Single是2.x版本中的一种基础响应类型,Single是从头开始重新设计的,能单独发射一个onSuccess或者onError事件,它现在的架构来自于the Reactive-Streams设计。它的消费者类型已经从接受rx.Subscriptionrx.Single.SingleSubscriber<T>变为了io.reactivex.SingleObserver<T>,有3个方法:

interface SingleObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
}
3 . Completable使用介绍

Completeble类型基本保持不变,1.x的版本已经沿着Reactive-Streams风格设计,所以没有用户级别的更改。

相似地命名改变,rx.Completable.CompletableSubscriber变为带有onSubscribe(Disposable)方法的io.reactivex.CompletableObserver:

interface CompletableObserver<T> {
    void onSubscribe(Disposable d);
    void onComplete();
    void onError(Throwable error);
}
4 . Maybe 使用介绍

RxJava 2.0.0-RC2 介绍了一个新的基础响应类型,它叫做Maybe。从概念上来将,它像是 Single和Completable的结合,它可能发射0个或者1个项目,或者一个error信号。

Maybe类通过依赖MaybeSource作为它的基础接口类型MaybeObserver<T>作为信号响应接口并且遵循协议onSubscribe (onSuccess | onError | onComplete)?因为最多可能发射1个元素,所以Maybe类型没有背压的概念(因为它没有像Flowable和Observable一样有未知长度的可膨胀缓冲区)

这意味着onSubscribe(Disposable)的调用潜在地跟随着其他onXXX方法之一的调用,不同于Flowable,如果这儿只有一个信号值发射信号,那么只有onSuccess被调用,而不会调用complete

二、RxJava 2.x中5种类型使用示例

1 . Observable示例

在写示例之前,我们先来回顾一下 1.x 版本是如何创建Observable和如何订阅的:
RxJava 1.x :

  //创建 observable
       Observable observable =  Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("hello world");
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread());
                
       //订阅方式一
        observable.subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {

            }
        });
        
        // 订阅方式二
         observable.subscribe(new Action1() {
             @Override
             public void call(Object o) {
                 // onNext
             }
         });

通过create方法创建Observable,接收一个OnSubscribe接口,其中有一个回调方法call,参数为Subscriber,我们用Subscriber来发射数据。通过subscribe方法来订阅,可以接收一个Subscriber 实现全部方法,也可以接收一个Action1只实现onNext方法。

RxJava 2.x :

       //创建Observable
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                Log.e(TAG,"start emitter data");
                e.onNext("Hello");
                e.onNext("world");
                e.onComplete();
            }
        })
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread());
        
          // 订阅方式一:下游消费者 Observer
          observable.subscribe(new Observer<String>() {
              @Override
             public void onSubscribe(@NonNull Disposable d) {
            // onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法
                 Log.e(TAG,"onSubscribe");
              }

              @Override
              public void onNext(@NonNull String s) {
                  Log.e(TAG,"onNext");
                  Log.e(TAG,s);
              }

              @Override
              public void onError(@NonNull Throwable e) {
                  Log.e(TAG,"onError");
              }

              @Override
              public void onComplete() {
                  Log.e(TAG,"onComplete");
              }
          });

         // 订阅方式二:Consumer
         observable.subscribe(new Consumer<String>() {
             @Override
             public void accept(@NonNull String o) throws Exception {
                  Log.e(TAG,"consumer:"+o);
             }
         });

打印结果如下:

06-25 14:31:35.435 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onSubscribe
06-25 14:31:35.437 21505-21853/com.zhouwei.demoforrxjava2 E/MainActivity: start emitter data
06-25 14:31:35.438 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onNext:Hello
06-25 14:31:35.438 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onNext:world
06-25 14:31:35.438 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onComplete

其实我们可以对比一下,1.x 和 2.x 方法都试一样的,只是它们所接收的响应接口改变了,对应变化如下:

 RxJava 1.x       ->   RxJava 2.x
 ---------------------------------------
 OnSubscribe<T>   ->   ObservableOnSubscribe<T>
 Subscriber<T>    ->   Observer<T>
 Subscriber<T>    ->   ObservableEmitter<T>
 Action1<T>       ->   Consumer<T>
 

RxJava 2.x 中对这些接口进行了重新设计,让一个接口的职责更加单一,类的命名和方法命名与它的功能更佳符合(见名知意)。如在1.x 中,Subscriber 既能发射数据,又能消费数据,充当观察者和被观察者。在2.x 中 把它拆解成了2个接口。ObservableEmitter<T>专门用来发射数据,Consumer 专门用来消费数据。 除此之外,在RxJava 2.x 中,多了一个void onSubscribe(@NonNull Disposable d)回调方法,参数为Disposable,Disposable是用来解除订阅关系的,这让我们的解除订阅变得更佳容易(比起1.x 通过subscribe返回 Subscription)。

上面对比了在RxJava 1.x 和2.x 版本创建Observable的方式,其实在RxJava 2.x中,这5种类型的用法是非常相似的,它们的接口命名规则相同,只要你知道其中一种,就知道其他几种类型该如何在上游发射数据和在下游消费数据。create接收的类型都为xxxOnSubscrible(xxx为5种类型对应的名字),发射器为xxxEmitter,具体如下表:

RxJava 2.x 类型 create参数(响应接口) 发射器 Observer
Observable ObservableOnSubscribe ObservableEmitter Observer
Flowable FlowableOnSubscribe FlowableEmitter FlowableSubscriber
Single SingleOnSubscribe SingleEmitter SingleObserver
Completable CompletableOnSubscribe CompletableEmitter CompletableObserver
Maybe MaybeOnSubscribe MaybeEmitter MaybeObserver
2 . Flowable示例

上面对比了RxJava 1.x 和 2.x 创建使用Observable的方式,并且总结了2.x 相关类的改变,如上面表。那么使用Flowable的方式和Observable是很相似的,看一下代码:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
                Log.e(TAG,"start send data ");
                for(int i=0;i<100;i++){
                    e.onNext(i);
                }
                e.onComplete();
            }
        }, BackpressureStrategy.DROP)//指定背压策略
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new FlowableSubscriber<Integer>() {
            @Override
            public void onSubscribe(@NonNull Subscription s) {
               //1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法
               //2, 参数为  Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求
               //3,  必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。
               Log.e(TAG,"onSubscribe...");
               subscription = s;
                s.request(100);
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG,"onNext:"+integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.e(TAG,"onError..."+t);
            }

            @Override
            public void onComplete() {
                Log.e(TAG,"onComplete...");
            }
        });

Flowable和Observable的使用基本相同,只不过Observable不支持背压,而Flowable支持背压。使用的时候,还是要注意几个小细节:
1,创建Flowable的时候需要指定一个背压策略,本文使用的是PBackpressureStrategy.DROP(丢弃策略),RxJava 2.x中,内置了5种背压策略,由于篇幅有限,背压和背压策略下一篇拿出来单独讲。
2,onSubscribe 回调方法中,参数是Subscription而不是Disposable,前文说过,RxJava 2.x 中,订阅的管理换成了Disposable,但是Flowable使用的是Subscription,这个Subscription不是1.x 版本中的Subscription,虽然它有取消订阅的能力。主要用于请求上游元素和取消订阅。
3,在使用Flowable的时候,必须调用Subscription 的requsest方法请求,不然上游是不会发射数据的。看request的方法解释:

3 . Single、Completable 和 Maybe 示例

Single、Completable和Maybe就比较简单,Single用于只发射一个数据,Completable不发送数据,它给下游发射一个信号。而Maybe则是Single和Completable的结合,根据名字就可以看出,它的结果是不确定的,可能发发射0(Completable)或1(Single) 个元素,或者收到一个Error信号。

Single示例:

Single.create(new SingleOnSubscribe<Boolean>() {
            @Override
            public void subscribe(@NonNull SingleEmitter<Boolean> e) throws Exception {
                Log.e(TAG,"subscribe...");
                e.onSuccess(true);
            }
        })
        .observeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new SingleObserver<Boolean>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e(TAG,"onSubscribe...");
            }

            @Override
            public void onSuccess(@NonNull Boolean aBoolean) {
                Log.e(TAG,"onSuccess...");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG,"onError...");
            }
        });

Single只发射一个元素,所以没有complete 方法,不像Observable或者Flowable,数据发射完成之后,需要调用complete告诉下游已经完成。

Completable示例:

 Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(@NonNull CompletableEmitter e) throws Exception {
               Log.e(TAG,"start send data");
               e.onComplete();
            }
        }).subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(new CompletableObserver() {
              @Override
              public void onSubscribe(@NonNull Disposable d) {
                  Log.e(TAG,"onSubscribe");
              }

              @Override
              public void onComplete() {
                Log.e(TAG,"onComplete");
              }

              @Override
              public void onError(@NonNull Throwable e) {
                  Log.e(TAG,"onError");
              }
          });

Completable 不会发射数据,只会给下游发送一个信号。回调 onComplete方法。

Maybe示例:

 Maybe.create(new MaybeOnSubscribe<Boolean>() {
            @Override
            public void subscribe(@NonNull MaybeEmitter<Boolean> e) throws Exception {
                Log.e(TAG,"start send data");
                 e.onSuccess(true);
                 e.onComplete();

            }
        }).subscribeOn(Schedulers.io())

           .observeOn(AndroidSchedulers.mainThread())

           .subscribe(new MaybeObserver<Boolean>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e(TAG,"onSubscribe");
            }

            @Override
            public void onSuccess(@NonNull Boolean aBoolean) {
                Log.e(TAG,"1->onSuccess:"+aBoolean);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG,"onError");
            }

            @Override
            public void onComplete() {
                Log.e(TAG,"onComplete");
            }
        });

Maybe是Single和Completable的结合,需要注意的是onSuccessonComplete方法只会执行其中一个,这不同于Observable和Flowable最后是以onComplete()结尾.

如上面的代码,先调用onSuccess发射一个元素,再调用onComplete

e.onComplete();
e.onSuccess(true);

最后打印结果如下:

E/MainActivity: onSubscribe
E/MainActivity: start send data
E/MainActivity: onComplete

可以看到只回调了 onComplete,我们把调用的顺序调换一下:

e.onSuccess(true);
e.onComplete();

打印结果如下:

E/MainActivity: onSubscribe
E/MainActivity: start send data
E/MainActivity: 1->onSuccess:true

可以看到调换了之后打印OnSucces()而没有打印onComplete(),这也印证了只会回调其中之一。

三、总结

RxJava 2.x 相比于 1.x 还是有很大的变化,虽然操作符基本不变,但是很多类和接口都是基于Reactive-Streams 规范重新设计的,命名也发生了变换,要想玩转RxJava 2.x ,你得了解这些变化和使用场景,本文介绍了RxJava 2.x 的5种基础响应类型,希望对才开始学习RxJava 2.x 的同学有所帮助。

参考:
What's different in 2.0

上一篇 下一篇

猜你喜欢

热点阅读