工作生活

RxJava学习总结-合并类操作符

2019-07-02  本文已影响0人  取了个很好听的名字

前言

上一篇文章我们学习了flatMap和concatMap,他们两个都是将上游发送的数据都封装成一个个的Observable,再有一个Observable进行发送。本文我们将在学习一个新的操作符-zip

zip

zip专用于合并事件,该合并不是连接(flatMap,concatMap),而是两两配对。它按照严格的顺序应用这个函数。因此它只发射与发射数据项最少的那个Observable对象一样多的数据。


zip.png

通过分解动作我们可以看出:

1、组合的过程是分别从两根水管里严格按照事件的发送顺序各取出一个事件来进行组合, 并且每一个事件只会被使用一次,也就是说不会出现圆形1事件和三角形B事件进行合并,也不可能出现圆形2和三角形A进行合并的情况.

2、最终下游收到的事件数量是和上游中发送事件最少的那一根水管的事件数量相同. 这个也很好理解, 因为是从每一根水管 里取一个事件来进行合并, 最少的 那个肯定就最先取完 , 这个时候其他的水管尽管还有事件 , 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件了。

代码如下:

   Observable<Integer> observable_one = Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onNext(4);
                    }
                }).subscribeOn(Schedulers.io());
                Observable<String> observable_two = Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                        emitter.onNext("A");
                        emitter.onNext("B");
                        emitter.onNext("C");
                    }
                }).subscribeOn(Schedulers.io());

                Observable.zip(observable_one,observable_two, new BiFunction<Integer, String,String>() {

                    @Override
                    public String apply(Integer integer, String string) throws Exception {
                        return integer+string;
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                  .subscribe(new Observer<String>() {
                      @Override
                      public void onSubscribe(Disposable d) {

                      }

                      @Override
                      public void onNext(String s) {
                             Log.e("onNext",s);
                      }

                      @Override
                      public void onError(Throwable e) {

                      }

                      @Override
                      public void onComplete() {

                      }
                  });

测试结果如下:

07-02 09:42:35.010 19642-19642/com.zhqy.myrxjava E/onNext: 1A
07-02 09:42:35.010 19642-19642/com.zhqy.myrxjava E/onNext: 2B
07-02 09:42:35.010 19642-19642/com.zhqy.myrxjava E/onNext: 3C

应用范围

zip操作符可以应用在界面所需要的数据需要在两个或以上的接口的数据,当获取到两个接口的数据后再进行展示。

merge

将多个上游合并为一个上游,注意与zip的区别,merge只是将多个上游发送的事件都放在一个上游中进行发送。
代码如下:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                    }
                });

                Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                        emitter.onNext("A");
                        emitter.onNext("B");
                        emitter.onNext("C");
                        emitter.onNext("D");
                    }
                });

                Observable.merge(observable1,observable2)
                          .subscribe(new Consumer<Serializable>() {
                              @Override
                              public void accept(Serializable serializable) throws Exception {
                                     Log.e("accept",serializable+"");
                              }
                          });

测试结果如下:

07-02 11:50:51.225 20224-20224/com.zhqy.myrxjava E/accept: 1
07-02 11:50:51.226 20224-20224/com.zhqy.myrxjava E/accept: 2
07-02 11:50:51.226 20224-20224/com.zhqy.myrxjava E/accept: 3
07-02 11:50:51.226 20224-20224/com.zhqy.myrxjava E/accept: A
07-02 11:50:51.226 20224-20224/com.zhqy.myrxjava E/accept: B
07-02 11:50:51.226 20224-20224/com.zhqy.myrxjava E/accept: C
07-02 11:50:51.226 20224-20224/com.zhqy.myrxjava E/accept: D

startWith/startWithArray

在Observable开始发射他们的数据之前,startWith()通过传递一个参数来先发射一个数据序列。
代码如下:

Observable.just("1","2","3","4","5")
                          .startWith("我是第一个插入的数据")
                          .startWithArray("我是第二个插入的数据","我是第三个插入的数据")
                          .subscribe(new Consumer<String>() {
                              @Override
                              public void accept(String s) throws Exception {
                                  Log.e("accept",s);
                              }
                          });

测试结果如下:

07-02 13:52:47.579 15192-15192/com.zhqy.myrxjava E/accept: 我是第二个插入的数据
07-02 13:52:47.579 15192-15192/com.zhqy.myrxjava E/accept: 我是第三个插入的数据
07-02 13:52:47.580 15192-15192/com.zhqy.myrxjava E/accept: 我是第一个插入的数据
07-02 13:52:47.580 15192-15192/com.zhqy.myrxjava E/accept: 1
07-02 13:52:47.580 15192-15192/com.zhqy.myrxjava E/accept: 2
07-02 13:52:47.580 15192-15192/com.zhqy.myrxjava E/accept: 3
07-02 13:52:47.580 15192-15192/com.zhqy.myrxjava E/accept: 4
07-02 13:52:47.580 15192-15192/com.zhqy.myrxjava E/accept: 5

flatMapIterable

latMapIterable() 和flatMap()功能在流程上大体一致,唯一不同的是,flatMap是转一个Observable转换成多个Observable,每一个Observable最后又返回一个Observable。而flatMapInterable是将一个Observable转换成多个Observable,但是每一个Observable最后返回得是Iterable。Iterable,可以理解成返回一个list
代码如下:

  Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(2);
                    }
                }).flatMapIterable(new Function<Integer, Iterable<String>>() {

                    @Override
                    public Iterable<String> apply(Integer integer) throws Exception {
                      ArrayList list=new ArrayList();
                      list.add(integer+"a");
                      list.add(integer+"b");
                      return list;
                    }
                }).subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("accept",s);
                    }
                });

测试结果


07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 1a
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 1b
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 2a
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 2b
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 2a
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 2b

scan

将数据以一定的逻辑聚合起来,并将计算结果发送出去作为下个数据应用函数时的第一个参数使用
代码如下:

    Observable.just(1,2,3,4,5)
                          .scan(new BiFunction<Integer, Integer, Integer>() {
                              @Override
                              public Integer apply(Integer integer, Integer integer2) throws Exception {
                                  return integer+integer2;
                              }
                          }).subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("accept",integer+"");
                    }
                });

测试结果

07-03 14:21:30.523 3352-3352/com.zhqy.myrxjava E/accept: 1
07-03 14:21:30.523 3352-3352/com.zhqy.myrxjava E/accept: 3
07-03 14:21:30.523 3352-3352/com.zhqy.myrxjava E/accept: 6
07-03 14:21:30.523 3352-3352/com.zhqy.myrxjava E/accept: 10
07-03 14:21:30.523 3352-3352/com.zhqy.myrxjava E/accept: 15

groupby

将原始Observable发送的数据按照key进行分组,每个分组都会返回一个Observable,这些Observable分别发射其包含的数据。
代码如下:

  Observable.just(1,2,3,4,5)
                        .groupBy(new Function<Integer, Integer>() {
                            @Override
                            public Integer apply(Integer integer) throws Exception {
                                return integer%2;
                            }
                        }).subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
                    @Override
                    public void accept(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
                        integerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.e("accept",integerIntegerGroupedObservable.getKey()+":"+integer);
                            }
                        });
                    }
                });

测试结果

07-03 14:31:33.472 6798-6798/com.zhqy.myrxjava E/accept: 1:1
07-03 14:31:33.472 6798-6798/com.zhqy.myrxjava E/accept: 0:2
07-03 14:31:33.472 6798-6798/com.zhqy.myrxjava E/accept: 1:3
07-03 14:31:33.472 6798-6798/com.zhqy.myrxjava E/accept: 0:4
07-03 14:31:33.472 6798-6798/com.zhqy.myrxjava E/accept: 1:5

window

发送指定数量事件时,就将这些事件分为一组。
代码如下:

  Observable.just(1,2,3,4,5)
                          .window(3)
                          .subscribe(new Observer<Observable<Integer>>() {
                              @Override
                              public void onSubscribe(Disposable d) {
                                  Log.e("onSubscribe","onSubscribe");
                              }

                              @Override
                              public void onNext(Observable<Integer> integerObservable) {
                                  integerObservable.subscribe(new Observer<Integer>() {
                                      @Override
                                      public void onSubscribe(Disposable d) {
                                          Log.e("onSubscribe","onSubscribe");
                                      }

                                      @Override
                                      public void onNext(Integer integer) {
                                           Log.e("onNext",integer+"");
                                      }

                                      @Override
                                      public void onError(Throwable e) {

                                      }

                                      @Override
                                      public void onComplete() {
                                          Log.e("onComplete","onComplete");
                                      }
                                  });
                              }

                              @Override
                              public void onError(Throwable e) {

                              }

                              @Override
                              public void onComplete() {
                                  Log.e("onComplete","onComplete");
                              }
                          });

测试结果

07-03 14:39:32.474 9254-9254/com.zhqy.myrxjava E/onSubscribe: onSubscribe
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onSubscribe: onSubscribe
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onNext: 1
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onNext: 2
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onNext: 3
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onComplete: onComplete
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onSubscribe: onSubscribe
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onNext: 4
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onNext: 5
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onComplete: onComplete
07-03 14:39:32.488 9254-9254/com.zhqy.myrxjava E/onComplete: onComplete
上一篇下一篇

猜你喜欢

热点阅读