RxJava

RxJava操作符使用指南

2018-11-10  本文已影响152人  小白兔兽型大发

  先来张女神的剧照放松一下...


好的好的

  话说gakki又拍新剧了,继爱干家务且投怀送抱不送不行之后,又完美诠释了"假面"超人,微笑pasta,其实如果没人给予阳光,那就做个快乐的小太阳吧.
  好了,我们进入正题..


(*^▽^*)
  最近一直在重构老项目的代码,每天一顿删删删,开心到飞起...相信RxJava到现在没用过的人估计也很少了,我在重构的过程当中,把之前一些复杂的逻辑基本上都用操作符进行了处理,在这边我们就来总结一下,好像有点炒冷饭的意思...那RxJava(2.0+)的操作符大致有这么几类:创建,变换,过滤,组合等等,我们就一类一类说...

创建

首先,我们要说一下被观察者,也就是上游的创建方式,那我们发射一个带这样集合的火箭:
private List<Integer> mList = Arrays.asList(1, 2, 3, 4, 5);

create

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (Integer number : mList) {
                    emitter.onNext(number);
                }
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) 
            ...
            @Override
            public void onNext(Integer number)
              ...
            @Override
            public void onError(Throwable e)
             ...
            @Override
            public void onComplete()
            ...

        });

最原始的版本就是用create来创建,但是遍历集合一个一个发,太臃肿了,那我们可以换一种方式,并且有两处地方我们改动一下:
1:我现在只想要onNext()返回,这就要看看是否有Observer的实现类
2:同时Observable是不支持Backpressure的,如果我们发1万条,只会增大内存,不会抛MissingBackpressureException,我们可以用2.0+版本中的Flowable

rang

   Flowable.range(1,6).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Consumer,"+integer);
            }
        });
//                Consumer,1
//                Consumer,2
//                Consumer,3
//                Consumer,4
//                Consumer,5

range操作符是从1开始(包括1)依次发射后面6个数,很明显可以达到我们的要求,然后我们来看看接收的consumer,点进去发现它是一个接口:

package io.reactivex.functions;

/**
 * A functional interface (callback) that accepts a single value.
 * @param <T> the value type
 */
public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}

那这个consumer是2.0的,其对应的也是1.0+版本中的Action,这时候又有个疑问??那有异常我怎么处理?真正在开发之中是要自定义Observer的去统一处理数据异常的,我们写demo就不用那么麻烦,如果用consumer的话,想处理异常还可以:

  Flowable.range(1, 5).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Consumer," + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG, throwable.toString());
            }
        });

我们可以看到同一个接口用泛型来区分实现类,那之后我们为了代码更简洁,就用new consumer的方式来接收了.

fromIterable

除了rang之外,fromIterable也是可以的,而且这个操作符更贴切,因为其原理就是遍历集合,一个一个发射:

 Flowable.fromIterable(mList)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "Consumer," + integer);
                    }
                });

fromArray

同理:数组

   private Integer[] mArr = {1,2,3};
    ...
   Flowable.fromArray(mArr)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "Consumer," + integer);
                    }
                });

just

这里要不得不说一下just,这个操作符比较强大,它可以一次发送一个集合,数组,也可以一个一个的发送不同类型的数据等等,我们来举个例子:
(为了让它一行就可以,我这边把consumer这个匿名内部类替换成lamada表达式,更直观)

        //发射集合
        Flowable.just(mList).subscribe(numberList -> Log.d(TAG, numberList.toString()));
        //        [1, 2, 3, 4, 5]
        //发射数组
        Flowable.just(mArr).subscribe(numberArr -> Log.d(TAG, Arrays.toString(numberArr)));
        //        [1, 2, 3]
        //发射字符串
        Flowable.just(1, "A", 10).subscribe(str -> Log.d(TAG, str));
        //        1
        //        A
        //        10

defer

这个操作符很有意思,我们之前说的just,from等等,都是在创建上游被观察者的时候确定了数据,也就是说发射的什么数据我是知道的,那如果发射的数据本身就会变呢?比如这样:

        StringBuilder deferMsg = new StringBuilder();
        deferMsg.append("张三发来贺电");
        Flowable<String> justDefer = Flowable.just(deferMsg.toString());
        deferMsg.append(",李四发来贺电");
        justDefer.subscribe(s -> Log.d(TAG, s));
      //张三发来贺电

如果用defer的话

        StringBuilder deferMsg = new StringBuilder();
        deferMsg.append("张三发来贺电");
        Flowable<String> defer = Flowable.defer((Callable<Publisher<String>>) () ->
                Flowable.just(deferMsg.toString()));
        deferMsg.append(",李四发来贺电");
        defer.subscribe(s -> Log.d(TAG, s));
        //张三发来贺电,李四发来贺电

我们可以看到是订阅的时候才会走Flowable的回调方法并且会创建一个新的Flowable/Observable,而不是创建的时候走,有点类似于eventBus的sticky,有人说这个叫延迟订阅,所以defer用于处理动态的数据,保证上游的数据是最新的

repeat

repeat就是我们可以在发射的时候指定重复次数:

   Flowable.just(mList).repeat(2)
                       .subscribe(numbers -> Log.d(TAG, numbers.toString()));
  //[1, 2, 3, 4, 5] [1, 2, 3, 4, 5]

timer

  Flowable.timer(1000, TimeUnit.MILLISECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG, "Timer:" + aLong + "=====当前线程为:"
                                + Thread.currentThread().getName());
                    }
                });
//Timer:0=====当前线程为:RxComputationThreadPool-1

我们可以用Timer操作符去延迟执行一个任务,比如上面延迟1秒,上游会向下游发送一个数据0,这边有两点需要注意
1.timer的是有第三个参数的,第三个参数可以指定运行线程
2.如果不指定,默认线程为computation,如果想更改ui,记得要做线程转换

Interval

上面说的timer是延迟发送,而interval是以一个固定的时间间隔不断发发发,比如一个很常见的需求,我需要每隔5分钟获取服务器信息,就可以:

   Flowable.interval(5, TimeUnit.MINUTES).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "Timer:" + aLong + "=====当前线程为:"
                        + Thread.currentThread().getName());
            }
        });
        // Timer:0=====当前线程为:RxComputationThreadPool-1
        // Timer:1=====当前线程为:RxComputationThreadPool-1
        // Timer:2=====当前线程为:RxComputationThreadPool-1
        // Timer:3=====当前线程为:RxComputationThreadPool-1
        // Timer:4=====当前线程为:RxComputationThreadPool-1 
    

我们发现也是默认在子线程,当然,我们也可以写一个发送短信的倒计时:

        Flowable.interval(0, 1, TimeUnit.SECONDS)
                .take(60)
                .map(aLong -> 60 - aLong)
                .doOnSubscribe(subscription -> tv.setClickable(false))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(aLong -> {
                tv.setText(aLong + "s"); });

简单说一下

  1. interval第一个参数是延迟发送,我们这里用的0秒,也就是不延迟
  2. take:由于interval从0开始无限发,take就是制定发多少次,所以在这边我们让它只发60次.
  3. 由于数据是从0,1,2,3开始发,所以这里我们用map做了一个转换把数据变成了60,59,58...
  4. doOnSubscribe这个方法用于订阅之前执行,它跟doOnNext不太一样,doOnNext跟随onNext,onNext执行多少次,它就执行多少次,而doOnSubscribe只执行一次,所以,一般做初始化操作,比如我点击了按钮发送验证码,在倒计时的时候,你就不能再点击了,所以禁用点击功能.
  5. 正常情况下应该在onComplete中恢复按钮点击功能,并且修改ui为点击/获取.
  6. 以后就在也不用这写一块handler,那边写一块timer/timerTask了..

变换

先说用的最多的:

map

我们在工作中经常遇到数据处理的问题,可不是你想要什么数据形式,后台小哥哥就给的到你的,于是在我们拿到数据后,可以用map进行转换,比如:
1.转换成另外一个集合

        Flowable.just(mArr).map(new Function<List<Integer>, List<String>>() {
            @Override
            public List<String> apply(List<Integer> mList) throws Exception {
                List<String> newList = new ArrayList<>();
                for (Integer number : mList) {
                    newList.add(number + "string");
                }
                return newList;
            }
        }).subscribe(new Consumer<List<String>>() {
            @Override
            public void accept(List<String> newList) throws Exception {
                Log.d(TAG, newList.toString());
            }
        });
// [1string, 2string, 3string, 4string, 5string]

2.转换成一个对象

   //  将集合最后一个数字取出并放到对象里
  Flowable.just(mArr)
          .map((Function<List<Integer>, NumberTest>) mList -> {
            NumberTest numberTest = new NumberTest();
            numberTest.setNumber(mList.get(mList.size() - 1));
            return numberTest;
        }).subscribe((Consumer<NumberTest>) numberTest -> Log.d(TAG, numberTest.toString()));
    //  NumberTest{number=5}

  通过map操作符,我们可以在apply这个小方法里面,各种定义数据转换类型,你可以把一个对象换成另外一个对象,或者把一个对象放进一个数组,再或者把数组转换成一个集合,都在整个链式调用中间完成,代码逻辑简单易懂,最重要的是非常美观!!

flatMap

这个操作符跟Map相比有一点不同,我们先用flatmap写一下上面的例子:

   Flowable.just(mArr).flatMap(new Function<List<Integer>, Publisher<List<String>>>() {
            @Override
            public Publisher<List<String>> apply(List<Integer> mList) throws Exception {
                List<String> newList = new ArrayList<>();
                for (Integer number : mList) {
                    newList.add(number + "string");
                }
                return Flowable.just(newList);
            }
        }).subscribe(new Consumer<List<String>>() {
            @Override
            public void accept(List<String> newList) throws Exception {
                Log.d(TAG, newList.toString());
            }
        });
//[1string, 2string, 3string, 4string, 5string]

跟map相比,我们可以非常明显的看到在数据转换的类型上是不一样的.
1.map操作符:我们只需要将转换的类型作为apply方法中的返回值即可.
2.flatMap操作符:其原理是将上游发送的数据打包,形成一个全新的Flowable/Observable,所以我们在做数据转换的时候,需要将转换完成的数据类型重新封装成Flowable.
  WTF,麻烦不是一点点啊!


别着急.jpg

通过上面的例子,不难发现我们全程只发送了一个集合,当然转换跟接收也就只有这一次,如果是这种情况的话,像数据转换这种小问题交给map处理就可以了,而flatMap通常可以解决两种问题:
1.for循环嵌套,比如我要获取一个城市所有的门店中的每一名销售人员的业绩,显然有俩集合,一个门店的集合,一个销售人员的集合,那用flatMap我们就可以这样子:

 Flowable.fromIterable(cityStores)
         .flatMap((Function<CityStore, Publisher<Salesman>>) cityStore -> {
               return Flowable.fromIterable(cityStore.getSalesman()); })
         .subscribe(salesman -> Log.d(TAG, salesman.getCityStoreId() 
                        + "店的"
                        + salesman.getSalesManId() + "号的业绩为:"
                        + salesman.getSalesPerformance() + "元"));

//    1店的0号的业绩为:58元
//    1店的1号的业绩为:38元
//    1店的2号的业绩为:29元
//    2店的0号的业绩为:49元
//    2店的1号的业绩为:51元
//    2店的2号的业绩为:84元
//    3店的0号的业绩为:65元
//    3店的1号的业绩为:14元
//    3店的2号的业绩为:44元

2.网络请求嵌套,最常见的注册完以后自动登录,我们就完全可以这么写:

       ApiService.goToRegister()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext((Consumer<RegisterBean>) registerBean -> {
                    //注册完成
                })
                .observeOn(Schedulers.io())
                .flatMap((Function<RegisterBean, Publisher<LoginBean>>) 
                registerBean -> ApiService.goToLogin(account, pwd))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe((Consumer<LoginBean>) loginBean -> {
                    //登录成功
                });

或者比如首页,在不用zip的情况下,可以先获取轮播图,再获取列表数据,再获取其他信息,不同的接口只要一条链式调用就ok,当然在实际项目中封装一下线程转换,跟统一数据格式处理,那就更简单了,清爽到没朋友.

concatMap

concatMap跟flatMap相比,它是有序的,上文获取销售业绩的例子,比如我们给第一个店加一个500毫秒的延迟,那么顺序就变了:

    // 2店的0号的业绩为:7元
    ...
    // 3店的2号的业绩为:20元
    ...
    // 1店的2号的业绩为:80元

我们只需要将flatMap替换成concatmap就可以保证顺序

      Flowable.fromIterable(cityStores)
                .concatMap((Function<CityStore, Publisher<Salesman>>) cityStore -> {

                    int delay = 0;
                    if (1 == cityStore.getCityCtoreId()) {
                        delay = 500;
                    }
                    return Flowable.fromIterable(cityStore.getSalesman()).delay(delay, TimeUnit.MILLISECONDS);
                })
                .subscribe(salesman -> Log.d(TAG, salesman.getCityStoreId() + "店的"
                        + salesman.getSalesManId() + "号的业绩为:"
                        + salesman.getSalesPerformance() + "元"));
//        1店的0号的业绩为:21元
//        1店的1号的业绩为:2元
//        1店的2号的业绩为:88元
//        2店的0号的业绩为:93元
//        2店的1号的业绩为:33元
//        2店的2号的业绩为:44元
//        3店的0号的业绩为:100元
//        3店的1号的业绩为:43元
//        3店的2号的业绩为:39元

switchMap

这个操作符我感觉用的比较少,不过既然也属于map范畴,我们就简单说一下,我们先将上面的concatMap换成switchMap,并且在变换的时候打个log,并且加上线程:

   Flowable.fromIterable(cityStores)
                .switchMap((Function<CityStore, Publisher<Salesman>>) cityStore -> {

                    int delay = 0;
                    if (1 == cityStore.getCityCtoreId()) {
                        delay = 500;
                    }
                    Log.d(TAG,"=====switchMap:"+cityStore.getCityCtoreId());
                    return Flowable.fromIterable(cityStore.getSalesman()).delay(delay,TimeUnit.MILLISECONDS);
                })
                .subscribe(salesman -> Log.d(TAG, salesman.getCityStoreId() + "店的"
                        + salesman.getSalesManId() + "号的业绩为:"
                        + salesman.getSalesPerformance() + "元"));
//        =====switchMap:1|main
//        =====switchMap:2|main
//        2店的0号的业绩为:57元|RxComputationThreadPool-2
//        =====switchMap:3|main
//        3店的0号的业绩为:30元|RxComputationThreadPool-3
//        3店的1号的业绩为:86元|RxComputationThreadPool-3
//        3店的2号的业绩为:41元|RxComputationThreadPool-3

switchMap:不难发现它的原理是当上一个订阅流程未结束时,检测到有新的发射数据了,那么上一个订阅还未走的流程会被舍弃掉,拿上面的例子举例说明:
原数据是一共发射三个对象,当1号店睡500毫秒的时候,二号店发射了,1号店舍弃,这时候注意,所有经过变换再发射的数据统统扔到了子线程里,当2号店0号员工出了业绩,3号店发射了,那么2号店的1,2号员工被舍弃,流程直到3号店所有人员走完

groupBy

没错,同SQ中的groupBy差不多,就是分组的意思,也就是说把发射出去的数据按需求,分成不同的组,比如:现在我想把上面的数据分成两个组,1号店单独放在一个集合里,2,3号店放在一个集合里,那就酱:
(不好理解的就不用lamada了)

  Flowable.fromIterable(cityStores).groupBy(new Function<CityStore, Boolean>() {
            @Override
            public Boolean apply(CityStore cityStore) throws Exception {
                return cityStore.getCityCtoreId()==1;
            }
        }).subscribe(new Consumer<GroupedFlowable<Boolean, CityStore>>() {
            @Override
            public void accept(GroupedFlowable<Boolean, CityStore> objectCityStoreGroupedFlowable) throws Exception {
                Boolean key = objectCityStoreGroupedFlowable.getKey();
                Log.d(TAG,key+"");
                objectCityStoreGroupedFlowable.toList().subscribe(new Consumer<List<CityStore>>() {
                    @Override
                    public void accept(List<CityStore> cityStores) throws Exception {
                        Log.d(TAG,cityStores.toString());
                    }
                });
            }
        });
//        [CityStore {
//            cityCtoreId = 1, salesman = [Salesman {
//                cityStoreId = 1, salesPerformance = 35
//            }, Salesman {
//                cityStoreId = 1, salesPerformance = 31
//            }, Salesman {
//                cityStoreId = 1, salesPerformance = 10
//            }]
//        }]
        
//        [CityStore {
//            cityCtoreId = 2,
//                    salesman = [Salesman {
//                cityStoreId = 2, salesPerformance = 37
//            }, Salesman {
//                cityStoreId = 2, salesPerformance = 96
//            }, Salesman {
//                cityStoreId = 2, salesPerformance = 12
//            }]
//        }, CityStore {
//            cityCtoreId = 3,
//            salesman = [Salesman {
//                cityStoreId = 3, salesPerformance = 19
//            }, Salesman {
//                cityStoreId = 3, salesPerformance = 94
//            }, Salesman {
//                cityStoreId = 3, salesPerformance = 12
//            }]
//        }]

分析:

  1. 在apply这个回调方法中,我们需要定义自己的分组标准,上面定义了1号店返回true,其余返回false
  2. 我们定义的这个标准其实就是个key,那么groupBy这个操作符会根据这个key将原来的数据,也就是3个对象,拆分成2(true/false)个Flowable,然后在分别发射其中的数据,也就需要订阅两次.

buffer

官方的解释的非常到位:定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值,其本质是将一个Observable变换为另一个原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合.
我们详细说一下buffer的两个方法:

Flowable.fromIterable(mList).buffer(3)
                        .subscribe(integers -> Log.d(TAG,integers.toString()));
                //        [1, 2, 3]
                //        [4, 5]

这个方法类似于分组,等于是将之前的集合分成了若干集合发送,每一个集合会小于等于count,并且没有重叠部分,如果count正好等于或者大于集合的长度,就一次性都发射了,这个比较好理解

 Flowable.fromIterable(mList).buffer(3, 2)
                .subscribe(integers -> Log.d(TAG, integers.toString()));
        //    [1, 2, 3]
        //    [3, 4, 5]
        //    [5]

这个重载的方法就有一点不好理解了,我们分成几点说一下

  1. 首先看第一个参数count,count指定了我们每一次的缓存集合长度,第一次[1,2,3]没有任何问题
  2. skip这个值得意思是,每当我缓存的时候,跳过第几个数据,我们设置的是2,第一次缓存[1,2,3]的时候,我们可以想象有一个指针跳过了第二个数据(2)指向了-->3
  3. 那么当第二次缓存的时候,指针指着3,从3开始,继续缓存count(3)个数据[3,4,5],并且skip第二个数据(4),现在的指针指向了-->5.

然后依次类推,这样会有重叠的部分,也可能有丢掉的部分,当然buffer这个操作符还有一些其他的功能,可以戳这里,我们就不详谈了.

window

window这个操作符是把数据分成了每个窗口,逐次发射,实现起来跟buffer类似,但是他不是一次性发完了所有数据,其原理是把原始数据分好组以后封装成一个新的Flowable/Observable,并且由新的逐个发送数据,比如:

 Flowable.fromIterable(mList).window(3)
                .subscribe(new Consumer<Flowable<Integer>>() {
                    @Override
                    public void accept(Flowable<Integer> integerFlowable) throws Exception {
                        integerFlowable.subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.d(TAG, integer.toString());
                            }
                        }, new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                Log.d(TAG, "onError");

                            }
                        }, new Action() {

                            @Override
                            public void run() throws Exception {
                                Log.d(TAG,"onComplete");
                            }
                        });
                    }
                });
//        1
//        2
//        3
//        onComplete
//        4
//        5
//        onComplete

scan

scan这个操作符的意思是对每一个数据进行同一个函数操作,并且以生成的值作为计算标准,计算下一个数据,比如:

Flowable.fromIterable(mList).scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                Log.d(TAG, "integer:" + integer);
                Log.d(TAG, "integer2:" + integer2);
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, integer.toString());
            }
        });
//        1
//        integer:1
//        integer2:2
//        3
//        integer:3
//        integer2:3
//        6
//        integer:6
//        integer2:4
//        10
//        integer:10
//        integer2:5
//        15

由此可见 apply方法中的第一个integer对应了我们每一次计算出来的和,而第二个integer2就是我们每一次发送的原数据,累加发送,依次执行..

变换就写完了,我认为变换还是有一点不好理解的..本想一篇写完,不过觉得确实是太长了,下一篇我们会写一下剩下的操作符!

上一篇下一篇

猜你喜欢

热点阅读