工作生活

RxJava学习总结-转换类操作符

2019-07-01  本文已影响0人  取了个很好听的名字

前言

上一篇文章介绍了RxJava特点:链式编程和RxJava的进程切换,本文会对操作符map,flatMap和concatMap等转换类操作符进行介绍。

Map

map操作符可以将上游发送的事件通过map的函数进行变换,例如上游发送的是Integer类型的数据通过map函数(功能是Integer变换为String)变换后下游接收到了String类型的数据,这就是map操作符的功能,实例代码如下:

 Observable.create(new ObservableOnSubscribe<Integer>() {

                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onNext(4);
                        emitter.onNext(5);
                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return "全面战争:"+integer;
                    }
                }).subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("accept",s);
                    }
                });

测试结果如下:

07-01 17:05:41.472 10732-10732/com.zhqy.myrxjava E/accept: 全面战争:1
07-01 17:05:41.473 10732-10732/com.zhqy.myrxjava E/accept: 全面战争:2
07-01 17:05:41.473 10732-10732/com.zhqy.myrxjava E/accept: 全面战争:3
07-01 17:05:41.473 10732-10732/com.zhqy.myrxjava E/accept: 全面战争:4
07-01 17:05:41.473 10732-10732/com.zhqy.myrxjava E/accept: 全面战争:5

通过map操作符可以将上游发送的数据转换成任意类型的数据。

flatMap

flatMap是一个功能强大的操作符,他可以将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里。这是什么意思呢?就是你上游发送的事件会被封装成一个或多个Observables,再将这些这些Observables封装到一个Observable中里,这些Observables中事件就会通过这个Observable将事件发送出去。示例代码如下:

Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onNext(4);
                        emitter.onNext(5);
                    }
                }).subscribeOn(Schedulers.io())
                  .observeOn(AndroidSchedulers.mainThread())
                  .flatMap(new Function<Integer, ObservableSource<String>>() {
                      @Override
                      public ObservableSource<String> apply(Integer integer) throws Exception {
                          final ArrayList<String> list=new ArrayList<>();
                          for (int i=0;i<3;i++){
                                list.add("I AM A"+integer);
                          }
                          return Observable.fromIterable(list).delay(3, TimeUnit.SECONDS);
                      }
                  }).subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("accept",s);
                    }
                });

测试结果如下:

07-01 17:12:36.934 10732-11474/com.zhqy.myrxjava E/accept: I AM A1
07-01 17:12:36.934 10732-11475/com.zhqy.myrxjava E/accept: I AM A2
07-01 17:12:36.935 10732-11474/com.zhqy.myrxjava E/accept: I AM A1
07-01 17:12:36.935 10732-11474/com.zhqy.myrxjava E/accept: I AM A1
07-01 17:12:36.936 10732-11475/com.zhqy.myrxjava E/accept: I AM A2
07-01 17:12:36.936 10732-11475/com.zhqy.myrxjava E/accept: I AM A2
07-01 17:12:36.937 10732-11476/com.zhqy.myrxjava E/accept: I AM A3
07-01 17:12:36.937 10732-11476/com.zhqy.myrxjava E/accept: I AM A3
07-01 17:12:36.937 10732-11476/com.zhqy.myrxjava E/accept: I AM A3
07-01 17:12:36.938 10732-11476/com.zhqy.myrxjava E/accept: I AM A4
07-01 17:12:36.938 10732-11476/com.zhqy.myrxjava E/accept: I AM A4
07-01 17:12:36.938 10732-11476/com.zhqy.myrxjava E/accept: I AM A4
07-01 17:12:36.938 10732-11478/com.zhqy.myrxjava E/accept: I AM A5
07-01 17:12:36.938 10732-11478/com.zhqy.myrxjava E/accept: I AM A5
07-01 17:12:36.938 10732-11478/com.zhqy.myrxjava E/accept: I AM A5

从测试结果可以看出flatMap并不保证事件的发送顺序,如果想要保证事件发送的顺序请使用concatMap。

concatMap

concatMap与flatMap的区别在于是否能保证事件的发送顺序,concatMap能够保证事件的发送顺序,flatMap则不能保证事件的发送顺序。示例代码如下:

 Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onNext(4);
                        emitter.onNext(5);
                    }
                }).subscribeOn(Schedulers.io())
                  .observeOn(AndroidSchedulers.mainThread())
                  .concatMap(new Function<Integer, ObservableSource<String>>() {
                      @Override
                      public ObservableSource<String> apply(Integer integer) throws Exception {
                          ArrayList<String> arrayList=new ArrayList<>();
                          for (int i=0;i<3;i++){
                            arrayList.add("I AM A "+integer);
                          }
                          return Observable.fromIterable(arrayList).delay(1,TimeUnit.SECONDS);
                      }
                  }).subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("accept",s);
                    }
                });

测试结果如下:

07-01 17:18:07.894 10732-11476/com.zhqy.myrxjava E/accept: I AM A 1
07-01 17:18:07.894 10732-11476/com.zhqy.myrxjava E/accept: I AM A 1
07-01 17:18:07.894 10732-11476/com.zhqy.myrxjava E/accept: I AM A 1
07-01 17:18:08.895 10732-11477/com.zhqy.myrxjava E/accept: I AM A 2
07-01 17:18:08.895 10732-11477/com.zhqy.myrxjava E/accept: I AM A 2
07-01 17:18:08.896 10732-11477/com.zhqy.myrxjava E/accept: I AM A 2
07-01 17:18:09.897 10732-11478/com.zhqy.myrxjava E/accept: I AM A 3
07-01 17:18:09.897 10732-11478/com.zhqy.myrxjava E/accept: I AM A 3
07-01 17:18:09.897 10732-11478/com.zhqy.myrxjava E/accept: I AM A 3
07-01 17:18:10.898 10732-11625/com.zhqy.myrxjava E/accept: I AM A 4
07-01 17:18:10.898 10732-11625/com.zhqy.myrxjava E/accept: I AM A 4
07-01 17:18:10.898 10732-11625/com.zhqy.myrxjava E/accept: I AM A 4
07-01 17:18:11.899 10732-11626/com.zhqy.myrxjava E/accept: I AM A 5
07-01 17:18:11.899 10732-11626/com.zhqy.myrxjava E/accept: I AM A 5
07-01 17:18:11.899 10732-11626/com.zhqy.myrxjava E/accept: I AM A 5

从测试结果可以看出concatMap能够保证上游事件的发送顺序。

感悟

个人认为map操作符可以将上游发送的数据通过函数变换转化为其他数据类型的数据,而flatMap和concatMap则可以解决网络请求中嵌套请求(即请求A后才可以请求B如此反复),如果按照以前的写法写嵌套请求代码会耦合并且会对代码阅读造成困难。而如果使用RxJava的flatMap或者concatMap则会解决嵌套问题并且更容易理解代码帮助阅读者理清思路。

实践

假设现在有一个登录界面在调用登录接口后在调用接口获取应用主页的数据,这样就形成了嵌套调用,即调用登录接口后再调用主页接口实现登录并拉取主界面数据的功能。
首先导入依赖,网络请求我使用的是Retrofit2。

 implementation  'com.squareup.retrofit2:retrofit:2.6.0'
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.6.0'
    implementation 'com.squareup.retrofit2:converter-gson:2.6.0'
    implementation('com.squareup.retrofit2:converter-protobuf:2.6.0') {
        exclude group: 'com.google.protobuf', module: 'protobuf-java'
    }

其中Protobuf是一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储,本文使用Protobuf对数据数据进行封装。网络接口如下:

public interface Api {
    @POST("LoginRequest")
    Observable<DafResponse> childLogin(@Body  LoginRequest request);

    @POST("HomePageRequest")
    Observable<DafResponse> childHome(@Body HomePageRequest request);
}

代码如下:

   OkHttpClient client=new OkHttpClient
                        .Builder()
                        .addInterceptor(new ResponseInterceptor())
                        .addInterceptor(new RequestInterceptor())
                        .build();

                Retrofit retrofit=new Retrofit.Builder()
                        .addConverterFactory(ProtoConverterFactory.create())
                        .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                        .baseUrl(url)
                        .client(client)
                        .build();
                final Api api = retrofit.create(Api.class);
                //调用登录接口
                Observable<DafResponse> login = api.childLogin(getLoginRequest());
                login.subscribeOn(Schedulers.io())
                        .concatMap(new Function<DafResponse, ObservableSource<DafResponse>>() {
                            @Override
                            public ObservableSource<DafResponse> apply(final DafResponse response) throws Exception {
                                //调用登录接口成功后调用主页数据接口
                                if (response.getCode().toLowerCase().equals("ok")) {
                                 HomePageRequest build = HomePageRequest.newBuilder().build();
                                    return api.childHome(build);
                                } else {
                                  //调用登录接口失败,发送失败消息
                                   return ObservableError.create(new ObservableOnSubscribe<DafResponse>() {
                                        @Override
                                        public void subscribe(ObservableEmitter<DafResponse> emitter) throws Exception {
                                            emitter.onError(new Throwable(response.getDesc()));
                                        }
                                    });
                                }

                            }
                        })
                        .doOnError(new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                //出现错误后,切断上下游联系,在此可以对错误信息进行处理
                                Log.e("accespt", throwable.getMessage());
                                disposable.dispose();
                            }
                        })
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Observer<DafResponse>() {
                            @Override
                            public void onSubscribe(Disposable d) {
                                disposable=d;
                            }

                            @Override
                            public void onNext(DafResponse response) {
                                 //打印获取到得主界面数据
                                 Log.e("onNext",response.getDesc());
                            }

                            @Override
                            public void onError(Throwable e) {
                                 Log.e("onError",e.getMessage());
                            }

                            @Override
                            public void onComplete() {

                            }
                        });

可以看到登录和获取主界面数据返回的都是一个上游Observable, 而我们的concatMap操作符的作用就是把一个Observable转换为另一个Observable, 因此结果就很显而易见了。

buffer

buffer的作用是定时或定量缓存打包事件,将其发射到下游,而不是一次一个发射!
实例代码如下:

//buffer(count,skip) 第一个参数count代表每次缓存的数量,skip表示下一次缓存的间隔,
        //如skip是1 表示下一次从2开始缓存,可以不写默认skip等于count
         Observable.just(1,2,3,4)
                        .buffer(2)
                        .subscribe(new Consumer<List<Integer>>() {
                            @Override
                            public void accept(List<Integer> list) throws Exception {
                                 Log.e("accept","这是一次发送过来的事件");
                                 for (Integer integer:list){
                                      Log.e("integer",integer+"");
                                 }
                            }
                        });

测试是结果如下:

07-02 10:24:14.047 28481-28481/com.zhqy.myrxjava E/accept: 这是一次发送过来的事件
07-02 10:24:14.047 28481-28481/com.zhqy.myrxjava E/integer: 1
07-02 10:24:14.047 28481-28481/com.zhqy.myrxjava E/integer: 2
07-02 10:24:14.047 28481-28481/com.zhqy.myrxjava E/accept: 这是一次发送过来的事件
07-02 10:24:14.047 28481-28481/com.zhqy.myrxjava E/integer: 3
07-02 10:24:14.047 28481-28481/com.zhqy.myrxjava E/integer: 4
上一篇下一篇

猜你喜欢

热点阅读