android开发专题

RxJava操作符使用指南(二)

2018-11-10  本文已影响30人  小白兔兽型大发
妹子镇楼

过滤

过滤类型的操作符就比较简单啦,主要就是将数据进行筛选,比如我们最常用的:

filter

通常用这个操作符去过滤掉不需要的数据,保证下游只接收我们想要的,比如,我现在只想要2号店的所有销售人员的数据:

   Flowable.fromIterable(cityStores)
                .concatMap((Function<CityStore, Publisher<Salesman>>) cityStore -> {

                    int delay = 0;
                    if (1 == cityStore.getCityCtoreId()) {
                        delay = 500;
                    }
                    return Flowable.fromIterable(cityStore.getSalesman()).delay(delay, TimeUnit.MILLISECONDS);
                })
                .filter(salesman -> salesman.getCityStoreId() == 2)
                .subscribe(salesman -> Log.d(TAG, salesman.getCityStoreId() + "店的"
                        + salesman.getSalesManId() + "号的业绩为:"
                        + salesman.getSalesPerformance() + "元"));

        //    2店的0号的业绩为:60元
        //    2店的1号的业绩为:27元
        //    2店的2号的业绩为:33元

完全不用写for循环判断了!!

debounce

翻译过来就是防抖动的意思,其含义为,当我们发送了一次数据,在规定的时间内,又发送了新的数据,那么这一次发送的数据会被丢掉,先来看个例子

Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                emitter.onNext("张三");
                Thread.sleep(299);
                emitter.onNext("李四");
                Thread.sleep(300);
                emitter.onNext("王五");
                Thread.sleep(350);
                emitter.onNext("赵六");
                Thread.sleep(250);
                emitter.onNext("费七");
                Thread.sleep(100);
                emitter.onNext("陈八");
                emitter.onComplete();
            }
        }, BackpressureStrategy.BUFFER)
                .debounce(300, TimeUnit.MILLISECONDS)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {
                        Log.d(TAG, str);
                    }
                });
        //         李四
        //         王五
        //         陈八

这边我们规定的时间为300毫秒,

distinct

去重,单纯的调用distinct(),就是去掉重复发射的元素

 List<Integer> distinctList = Arrays.asList(1, 1, 3, 3, 5);
        Flowable.fromIterable(distinctList)
                .distinct()
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG,integer+"");
            }
        });
          //   1
          //   3
          //   5

还有一个重载的方法可以根据变量去重

        List<DistinctBean> distinctBeans = new ArrayList<>();
        distinctBeans.add(new DistinctBean("aaaa"));
        distinctBeans.add(new DistinctBean("bbbb"));
        distinctBeans.add(new DistinctBean("aaaa"));
        distinctBeans.add(new DistinctBean("cccc"));
        distinctBeans.add(new DistinctBean("bbbb"));
        Flowable.fromIterable(distinctBeans)
                .distinct(new Function<DistinctBean, String>() {
                    @Override
                    public String apply(DistinctBean distinctBean) throws Exception {
                        return distinctBean.str;
                    }
                })
                .subscribe(distinctBean -> Log.d(TAG, distinctBean.toString()));
//        DistinctBean{str='aaaa'}
//        DistinctBean{str='bbbb'}
//        DistinctBean{str='cccc'}

也就是我们可以在apply方法中去指定一个key,来判断是否是重复的数据

ElementAt

获取指定位置的发射数据,索引是从0开始

 List<Integer> mList = Arrays.asList(1, 2, 3, 4, 5);
        Flowable.fromIterable(mList)
                .elementAt(3)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, integer + "");
                    }
                });
        //   4

first

只发射第一个数据

   List<Integer> mList = Arrays.asList(1, 2, 3, 4, 5);
        Flowable.fromIterable(mList)
                .first(1)
                .subscribe(integer -> Log.d(TAG, integer + ""));
        //   1

如果用first的话,有个参数,这个参数是默认的item索引,也就是说原数据一直没发送,那么会发送默认索引上的值,不要参数的话可以换成firstElement

last

只发射最后一个数据

Flowable.fromIterable(mList)
                .last(1)
                .subscribe(integer -> Log.d(TAG, integer + ""));
        //  5

跟first完全一致,不要参数的话换成 lastElement

sample

sample就是抽样的意思,我们可以指定在某一个时间,对发射的数据进行采集,采集数据的标准时之前发射阶段的最有一个数据,比如:

  Flowable.interval(1000, TimeUnit.MILLISECONDS)
                .sample(3000, TimeUnit.MILLISECONDS)
                .subscribe(aLong -> Log.d(TAG, aLong + ""));
        //        1
        //        4
        //        7
        //        10 
        //        ..

我们开一个定时任务,1秒钟发送一个数字(从0开始),3秒之后采集到最后一个发射数据1,再过3秒采集到了4,依次类推..

skip/skipLast

Flowable.fromIterable(mList).skip(2)
      .subscribe(integer -> Log.d(TAG,integer.toString()));
        //   3
        //   4
        //   5

另外,我们要说一下skip的重载方法

  1. Javadoc: skipLast(long,TimeUnit)
  2. Javadoc: skipLast(long,TimeUnit,Scheduler)

举个例子

 Flowable.interval(1000,TimeUnit.MILLISECONDS)
                .skip(3000,TimeUnit.MILLISECONDS)
                .subscribe(aLong -> Log.d(TAG, aLong.toString()+",当前线程:"
                        +Thread.currentThread().getName()));
        //   3,当前线程:RxComputationThreadPool-2
        //   4,当前线程:RxComputationThreadPool-2
        //   5,当前线程:RxComputationThreadPool-2
        //   ...

这里我们用到了两个参数的重载方法,我们很明显可以看到这个方法skip的值为时间值,示例为跳过前3秒的发送数据,而且输出环境在子线程里,如果想指定线程的话,可以用第三个参数进行指定.

 Flowable.fromIterable(mList).skipLast(2)
      .subscribe(integer -> Log.d(TAG,integer.toString()));
        //   1
        //   2
        //   3

take/takeLast

一个是只发送前面n项,一个正相反,发送后面n项

  Flowable.fromIterable(mList).take(2)
                .subscribe(integer -> Log.d(TAG,integer.toString()));
        //   1
        //   2
        Flowable.fromIterable(mList).takeLast(2)
                .subscribe(integer -> Log.d(TAG,integer.toString()));
        //   4
        //   5

take/takeLast也是有重载方法的,其默认线程也在子线程中(computation),跟skip没区别,我们就不在举例了

组合

zip

官方翻译:zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据,什么意思?先举一个简单的小例子:

 Flowable.zip(Flowable.just("1"), Flowable.just("A", "B", "C"),
                (s1, s2) -> s1 + "-----" + s2).subscribe(s -> Log.d(TAG, s));
  //1----A

zip是rxJava2.0+才有的,它可以解决了复杂页面多接口调用的问题,也就是说一个页面好多接口,如果都响应完了才展示页面的话,就不用同步去请求,我们可以用zip同时发,其原理是把多个Observable组合成新的Observable,比如首页获取个人信息,获取banner,获取活动列表三个接口,就可以:

Flowable<BaseDataResponse<List<Banner>>> bannerFlowable = ApiService.getbanner();
Flowable<BaseDataResponse<List<Activity>>> activityFlowable = ApiService.getActivityInfo();
Flowable<BaseDataResponse<Member>> memberFlowable = ApiService.getMemberInfo();
        
        Flowable.zip(bannerFlowable,
                activityFlowable,
                memberFlowable,
                new Function3<BaseDataResponse<List<Banner>>,
                        BaseDataResponse<List<Activity>>,
                        BaseDataResponse<Member>, Map<String, Object>>() {
                    @Override
                    public Map<String, Object> apply(BaseDataResponse<List<Banner>> bannerResponse,
                                                     BaseDataResponse<List<Activity>> activityResponse,
                                                     BaseDataResponse<Member> memberResponse) throws Exception {
                        HashMap<String, Object> map = new HashMap<>(3);
                        map.put("HOME_BANNER", bannerResponse);
                        map.put("HOME_ACTIVITY", activityResponse);
                        map.put("HOME_MEMBER", memberResponse);
                        return map;
                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Map<String, Object>>() {
                    //1.正常这边应该有一个自定义的Observer,可以用subscribeWith 自定义一个Observer,处理数据异
                    //2.由于每个公司的数据形式也不统一,我们举例子就不写了
                    @Override
                    public void accept(Map<String, Object> dataMap) throws Exception {
                        BaseDataResponse < List < Banner >> BannerResponse = cast(dataMap.get("HOME_BANNER"));
                        if(BannerResponse!=null){
                            List<Banner> bannerList = BannerResponse.getData();
//                            mView.showBanner(bannerList);
                        }
                 //       ..
                    }
                });

首先定义三个接口flowable,然后分别作为zip方法的前三个参数,而function方法中,通常会定义一个基本的数据类型封装类,其泛型为三个接口返回的数据类型,第四个是做为apply方法的返回值的,我们可以定义Map,List,Object,都没有问题,其主要是封装返回值,这里我们用的是map,
接下来线程转换,在accept中取出每一个response 转换成我们要的数据,展示,也可以用lamada简化一下:

 Flowable.zip(bannerFlowable, activityFlowable, memberFlowable,
                (bannerResponse, activityResponse, memberResponse) -> {
                    HashMap<String, Object> map = new HashMap<>(3);
                    map.put("HOME_BANNER", bannerResponse);
                    map.put("HOME_ACTIVITY", activityResponse);
                    map.put("HOME_MEMBER", memberResponse);
                    return map;
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(dataMap -> {
                    BaseDataResponse<List<Banner>> BannerResponse = cast(dataMap.get("HOME_BANNER"));
                    if (BannerResponse != null) {
                        List<Banner> bannerList = BannerResponse.getData();
                        //                            mView.showBanner(bannerList);
                    }
                    //        ...
                });

merge

merge就是合并的意思,上文用zip实现的例子同样也可以用merge实现,比如:

 Flowable<BaseDataResponse<List<Banner>>> bannerFlowable = ApiService.getbanner();
        Flowable<BaseDataResponse<List<Activity>>> activityFlowable = ApiService.getActivityInfo();
        Flowable<BaseDataResponse<Member>> memberFlowable = ApiService.getMemberInfo();
        Flowable.merge(bannerFlowable, activityFlowable, memberFlowable)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<BaseDataResponse<? extends Object>>() {
                    @Override
                    public void accept(BaseDataResponse<?> baseDataResponse) throws Exception {
                        //baseDataResponse->banner
                        //baseDataResponse->activity
                        //baseDataResponse->member
                    }
                });

不同的是,zip是打包封装(我们封装成了map一并返回),而merge中的accept方法要走三次,需要我们进行分别判断,我们可以看一下merge的源码:

@SuppressWarnings({ "unchecked", "rawtypes" })
    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Flowable<T> merge(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3) {
        ObjectHelper.requireNonNull(source1, "source1 is null");
        ObjectHelper.requireNonNull(source2, "source2 is null");
        ObjectHelper.requireNonNull(source3, "source3 is null");
        return fromArray(source1, source2, source3).flatMap((Function)Functions.identity(), false, 3);
    }

很明显 返回值的是通过fromArray轮流发送在进行flatMap进行变换,再举个例子就很清楚了

        Flowable.merge(Flowable.just("1"), Flowable.just("A", "B", "C"))
                .subscribe(s -> {
                    Log.d(TAG, s);
                    //1
                    //A
                    //B
                    //C
                });

这里有两个问题需要说一下
1.merge是轮流发的,如果有10个要发送,那么第一个抛异常,第二个就不走了,如果想让下面的继续发,需要将merge换成MergeDelayError
2.merge一但我们在发送过程中延迟发送那么发送的数据会显示在最后面,如果想保证顺序,需要将merge换成concat

combineLatest

这个操作符基本用于校验,zip是组合每一个发射的Flowable,combineLatest是只要其中一个发射的时候,他会找其他已经发射过的Flowable的最后一个数据,进行组合,比如:

        Flowable<Integer> flowable1 = Flowable.just(1, 2, 3, 4, 5);
        Flowable<String> flowable2 = Flowable.just("A", "B", "C");
        Flowable<String> flowable3 = Flowable.just("100", "200");
        Flowable.combineLatest(flowable1, flowable2, flowable3, new Function3<Integer, String, String, String>() {
            @Override
            public String apply(Integer integer, String s, String s2) throws Exception {
                return integer + ":" + s + ":" + s2;
            }
        }).subscribe(string -> Log.d(TAG, string));
//        5,C,100
//        5:C:200

flowable1, flowable2都已经发射过了,到flowable3,当它发射100的时候,5,"c",分别对应了最近发射的数据,所以按apply中的方式进行组合在一起输出,那实际运用当中,我可以用它进行多项条件的判定,都满足,再进行下一步

join

官方解读:任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据,等于加了个时间的范畴,也就是两个Observable,在发射的数据都是有有效期的,同在有效期之内,就组合,比如:

 Flowable<Long> baseFlowable = Flowable.interval(1000, TimeUnit.MILLISECONDS);
        Flowable<String> flowable = Flowable.just("A", "B", "C", "D");
        baseFlowable.join(flowable, new Function<Long, Publisher<Long>>() {
            @Override
            public Publisher<Long> apply(Long aLong) throws Exception {
                Log.d(TAG, "===left:" + aLong);
                return Flowable.timer(2000, TimeUnit.MILLISECONDS);
            }
        }, new Function<String, Publisher<Long>>() {
            @Override
            public Publisher<Long> apply(String s) throws Exception {
                Log.d(TAG, "===right:" + s);
                return Flowable.timer(5000, TimeUnit.MILLISECONDS);
            }
        }, new BiFunction<Long, String, String>() {
            @Override
            public String apply(Long aLong, String s) throws Exception {
                return aLong + "----" + s;
            }
        }).subscribe(string -> Log.d(TAG, string));

        //        ===right:A
        //        ===right:B
        //        ===right:C
        //        ===right:D
        //        ==left:0
        //        0----A
        //        0----B
        //        0----C
        //        0----D
        //        ===left:1
        //        1----A
        //        1----B
        //        1----C
        //        1----D
        //        ...
        //        4----A
        //        4----B
        //        4----C
        //        4----D
        //        ===left:5
        //        ===left:6
        //        ===left:7
        //      ....

我们通过观察不难发现

startWith

在数据序列的开头插入一条指定的项

List<Integer> mList = Arrays.asList(1, 2, 3, 4, 5);
        Flowable.fromIterable(mList).startWith(100).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, integer.toString());
            }
        });
        //      100
        //      1
        //      2
        //      ...

当然,还可以这样:

   Flowable.fromIterable(mList).startWith(Arrays.asList(100,200))
                .subscribe(integer -> Log.d(TAG, integer.toString()));
        //      100
        //      200
        //      1
        //      2
        //      ...

        Flowable.fromIterable(mList).startWith(Flowable.just(1000,2000))
                .subscribe(integer -> Log.d(TAG, integer.toString()));
        //      1000
        //      2000
        //      1
        //      2
        //      ...

switchOnNext

订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个,Switch返回的这个Observable取消订阅前一个发射数据的Observable,开始发射最近的Observable发射的数据。注意:当原始Observable发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在后来那个Observable产生之后到它开始发射数据之前的这段时间里,前一个Observable发射的数据将被丢弃.
在这边我们借鉴了一下其他人写的例子:

Flowable<Flowable<Long>> flowable = Flowable.interval(500, TimeUnit.MILLISECONDS)
                .map(new Function<Long, Flowable<Long>>() {
                    @Override
                    public Flowable<Long> apply(Long aLong) throws Exception {
                        Log.d(TAG, "====fu: "+aLong );
                        return Flowable.interval(0,200,TimeUnit.MILLISECONDS)
                                .map(new Function<Long, Long>() {
                                    @Override
                                    public Long apply(Long aLong) throws Exception {
                                        Log.d(TAG, "===zi: "+aLong );
                                        return aLong * 10;
                                    }
                                }).take(5);
                    }
                }).take(2);

        Flowable.switchOnNext(flowable)
                .subscribe(aLong -> Log.d(TAG, "onNext: SwitchOnNext  "+aLong));

//        ====fu: 0
//        ===zi: 0
//        onNext: SwitchOnNext  0
//        ===zi: 1
//        onNext: SwitchOnNext  10
//        ===zi: 2
//        onNext: SwitchOnNext  20
//        ====fu: 1
//        ===zi: 0
//        onNext: SwitchOnNext  0
//        ===zi: 1
//        onNext: SwitchOnNext  10
//        ===zi: 2
//        onNext: SwitchOnNext  20
//        ===zi: 3
//        onNext: SwitchOnNext  30
//        ===zi: 4
//        onNext: SwitchOnNext  40

这个例子写的非常好,等于是一个Flowable循环嵌套,最外层只发送两次,当第二次发送数据,内层订阅之后,第一次还没来得及发的数据就被舍弃掉了.

到这边Rxjava的操作符基本都写完了,还有一些辅助,订阅,切换线程之类的比较简单,而且我们在示例中都有使用,就不再单独拎出来一一说明了.能看到这的绝对是真爱,爱你!!!
用的测试代码,请戳这里

上一篇下一篇

猜你喜欢

热点阅读