工作生活

RxJava学习总结-过滤类操作符

2019-07-02  本文已影响0人  取了个很好听的名字

前言

过滤类操作符就是过滤掉特殊的上游发射事件。

distinct

distinct:去除重复事件
代码如下:

 Observable.just(1,1,2,2,3,3,4,4)
                        .distinct()
                        .subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.e("accept",integer+"");
                            }
                        });

测试结果如下:

7-02 10:36:09.637 29251-29251/com.zhqy.myrxjava E/accept: 1
07-02 10:36:09.637 29251-29251/com.zhqy.myrxjava E/accept: 2
07-02 10:36:09.637 29251-29251/com.zhqy.myrxjava E/accept: 3
07-02 10:36:09.637 29251-29251/com.zhqy.myrxjava E/accept: 4

filter

过滤掉不符合条件的

 Observable.just(1,2,3,4,5)
                        .filter(new Predicate<Integer>() {
                            @Override
                            public boolean test(Integer integer) throws Exception {
                                return integer>2;
                            }
                        }).subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                            Log.e("accept",integer+"");
                    }
                });

测试结果如下:

07-02 10:41:30.310 29802-29802/com.zhqy.myrxjava E/accept: 3
07-02 10:41:30.310 29802-29802/com.zhqy.myrxjava E/accept: 4
07-02 10:41:30.310 29802-29802/com.zhqy.myrxjava E/accept: 5

elementAt

只发射特定下标的事件
代码如下:

  //找下标为1的事件,没有则发送-1
                Observable.just(1,2,3,4,5)
                        .elementAt(100,-1)
                        .subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.e("accept",integer+"");
                            }
                        });

测试结果

07-02 10:58:30.704 31487-31487/com.zhqy.myrxjava E/accept: -1

skip

忽视上游发射的前几项事件
代码如下:

Observable.just(1,2,3,4,5)
                          .skip(3)
                          .subscribe(new Consumer<Integer>() {
                              @Override
                              public void accept(Integer integer) throws Exception {
                                  Log.e("accept",integer+"");
                              }
                          });

测试结果如下:

07-02 11:05:57.808 4543-4543/com.zhqy.myrxjava E/accept: 4
07-02 11:05:57.809 4543-4543/com.zhqy.myrxjava E/accept: 5

丢弃Observable开始的那段时间发射 的数据
代码如下:

   Observable.interval(500,TimeUnit.MILLISECONDS)
                          .skip(3,TimeUnit.SECONDS)
                          .subscribe(new Consumer<Long>() {
                              @Override
                              public void accept(Long aLong) throws Exception {
                                  Log.e("accept",aLong+"");
                              }
                          });

测试结果

07-02 11:24:26.240 11070-11115/com.zhqy.myrxjava E/accept: 5
07-02 11:24:26.740 11070-11115/com.zhqy.myrxjava E/accept: 6
07-02 11:24:27.240 11070-11115/com.zhqy.myrxjava E/accept: 7
07-02 11:24:27.740 11070-11115/com.zhqy.myrxjava E/accept: 8
07-02 11:24:28.240 11070-11115/com.zhqy.myrxjava E/accept: 9
07-02 11:24:28.740 11070-11115/com.zhqy.myrxjava E/accept: 10
....

take

只接收上游发送的n个事件
代码如下:

  Observable.range(0,100)
                        .take(10)
                        .subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.e("accept",integer+"");
                            }
                        });

测试结果如下:

07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 0
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 1
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 2
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 3
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 4
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 5
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 6
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 7
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 8
07-02 11:29:27.824 11880-11880/com.zhqy.myrxjava E/accept: 9

takeLast

只接收最后n个上游发送的事件
代码如下:

   Observable.range(0,100)
                        .takeLast(10)
                        .subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.e("accept",integer+"");
                            }
                        });

测试结果:

07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 90
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 91
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 92
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 93
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 94
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 95
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 96
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 97
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 98
07-02 11:31:57.949 12377-12377/com.zhqy.myrxjava E/accept: 99

ofType

对发送的事件筛选出指定的数据类型发送
代码如下:

  Observable.just(1,2,3,"4","5")
                          .ofType(String.class)
                          .subscribe(new Consumer<String>() {
                              @Override
                              public void accept(String string) throws Exception {
                                    Log.e("accept",string);
                              }
                          });

测试结果如下

07-02 14:07:03.169 19405-19405/com.zhqy.myrxjava E/accept: 4
07-02 14:07:03.170 19405-19405/com.zhqy.myrxjava E/accept: 5

debounce

根据发送的事件时间间隔做出筛选,如果两次发送事件的间隔小于设定的timeout,则会取消前一个事件的发送
代码如下:

 Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        SystemClock.sleep(1000);
                        emitter.onNext(3);
                        emitter.onNext(4);
                        SystemClock.sleep(1000);
                        emitter.onNext(5);
                    }
                }).debounce(500,TimeUnit.MILLISECONDS)
                  .subscribe(new Consumer<Integer>() {
                      @Override
                      public void accept(Integer integer) throws Exception {
                          Log.e("accept",integer+"");
                      }
                  });

测试结果:

07-02 14:15:40.029 19952-20086/com.zhqy.myrxjava E/accept: 2
07-02 14:15:41.029 19952-20086/com.zhqy.myrxjava E/accept: 4
07-02 14:15:42.029 19952-20086/com.zhqy.myrxjava E/accept: 5

firstElement& lastElement

获取上游发送的第一个和最后一个事件
代码如下:

  Observable.just(1,2,3,4,5)
                          .firstElement()
                          .subscribe(new Consumer<Integer>() {
                              @Override
                              public void accept(Integer integer) throws Exception {
                                  Log.e("accept",integer+"");
                              }
                          });
Observable.just(1,2,3,4,5)
                          .lastElement()
                          .subscribe(new Consumer<Integer>() {
                              @Override
                              public void accept(Integer integer) throws Exception {
                                  Log.e("accept",integer+"");
                              }
                          });

测试结果

07-02 14:22:25.757 20430-20430/com.zhqy.myrxjava E/accept: 1
07-02 14:23:55.439 20621-20621/com.zhqy.myrxjava E/accept: 5
上一篇下一篇

猜你喜欢

热点阅读