RxJava 2.x下篇(操作符)

2018-11-12  本文已影响0人  luoqiang108

前言

通过上篇的实例,相信你对RxJava的用处肯定有个基本的了解了。如果还没看过,我强烈建议你先去看上篇,先对整体有个了解,有兴趣之后,带着目的去学枯燥乏味而且多的操作符。

看到这一堆的操作符是不是感觉瞬间就不想往下学了,但是我这里只挑部分我认为比较常用的来讲下,毕竟这么多要全讲的话不仅费时费力,而且效果也不太好,先学会常用的,其它的也是类似的,到时用的再去查询下上手也会很快的。

第一步:初始化 Observable
第二步:初始化 Observer
第三步:通过subscribe建立订阅关系

创建操作符

1、create()操作符

        final String TAG = "RxCreateActivity";
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable emit 1" + "\n");
                e.onNext(1);
                Log.e(TAG, "Observable emit 2" + "\n");
                e.onNext(2);
                Log.e(TAG, "Observable emit 3" + "\n");
                e.onNext(3);
                e.onComplete();
                Log.e(TAG, "Observable emit 4" + "\n");
                e.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n");
                mDisposable = d;
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e(TAG, "onNext : value : " + integer + "\n");
                i++;
                if (i == 2) {
                    // 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
                    mDisposable.dispose();
                    Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "onError : value : " + e.getMessage() + "\n");
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete" + "\n");
            }
        });
onSubscribe : false
Observable emit 1
onNext : value : 1
Observable emit 2
onNext : value : 2
onNext : isDisposable : true
Observable emit 3
Observable emit 4
  • 并且 2.x 中有一个 Disposable 概念,这个东西可以直接调用切断接收器,可以看到,当它的 isDisposed() 返回为 false 的时候,接收器能正常接收事件,但当其为 true 的时候,接收器停止了接收。所以可以通过此参数动态控制接收事件了。
  • 在发射事件中,我们在发射了数值 3 之后,直接调用了 e.onComplete(),由于在接收器接收了数值 2 之后接收器调用dispose()方法停止接收了,所以本该在接收器中执行的onComplete()方法就不会执行了。
  • 调用e.onComplete()方法之后接收器就不会再接收事件了,但发送事件还是继续的,通过日志能打印出来发射数值3和4我们就可以分析出这一点。如果调用e.onError(Throwable error)效果也是和onComplete()方法类似的,e.onError(Throwable error)对应执行的是接收器中的onError(@NonNull Throwable e)方法。
  • 另外一个值得注意的点是,在 RxJava 2.x 中,可以看到发射事件方法相比 1.x 多了一个 throws Excetion抛出异常,意味着我们做一些特定操作再也不用 try-catch 了。

2、just()操作符

注:最多只能发送10个参数

        // 1. 创建时传入字符串型1、2、3,
        // 当然也可以为其它类型,被观察者发射事件对象声明为泛型,但是一般同一个被观察者事件类型都是一致的
        // 在创建后就会发送这些对象,相当于执行了onNext("1")、onNext("2")、onNext("3")、onComplete()
        Observable.just("1", "2", "3")
                // 2. 通过通过订阅(subscribe)连接观察者和被观察者
                .subscribe(
                        // 3. 创建观察者 & 定义响应事件的行为
                        // Consumer是RxJava 2.x提供的用于实现观察者的简便式模式
                        new Consumer<String>() {
                            @Override
                            // 每次接收到Observable的onNext()方法发射的事件都会调用Consumer.accept()
                            public void accept(@NonNull String s) throws Exception {
                                Log.e(TAG, "accept : onNext : " + s + "\n");
                            }
                        });
accept : onNext : 1
accept : onNext : 2
accept : onNext : 3

3、fromArray()操作符

        // 1. 设置需要传入的数组
        Integer[] items = {0, 1, 2, 3, 4};
        // 2. 创建被观察者对象(Observable)时传入数组
        // 在创建后就会将该数组转换成Observable & 发送该对象中的所有数据
        Observable.fromArray(items)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.e(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "对Complete事件作出响应");
                    }
                });
开始采用subscribe连接
接收到了事件0
接收到了事件1
接收到了事件2
接收到了事件3
接收到了事件4
对Complete事件作出响应

4、fromIterable()操作符

5、timer()操作符

注意:默认在新线程,也就是子线程

timer操作符.png
        // 注:timer操作符默认运行在一个新线程上
        // 也可自定义线程调度器(第3个参数):timer(long,TimeUnit,Scheduler)
        // 该例子 = 延迟2s后,发送一个long类型数值0
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "开始采用subscribe连接");
                    }
                    @Override
                    public void onNext(Long value) {
                        Log.e(TAG, "接收到了事件" + value);
                    }
                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "对Error事件作出响应");
                    }
                    @Override
                    public void onComplete() {
                        Log.e(TAG, "对Complete事件作出响应");
                    }
                });
11-12 14:24:19.586 E: 开始采用subscribe连接
11-12 14:24:21.589 E: 接收到了事件0
11-12 14:24:21.590 E: 对Complete事件作出响应

6、interval()操作符

注意:
默认在新线程,也就是子线程
发送的事件序列 = 从0开始、无限递增1的的整数序列

interval操作符.png
private Disposable mDisposable;

        // 注:interval默认在computation调度器上执行
        // 也可自定义指定线程调度器(第3个参数):interval(long,long,TimeUnit,Scheduler)
        /*参数说明:
          参数1 = 第1次延迟时间;
          参数2 = 间隔时间数字;
          参数3 = 时间单位;*/
        Observable.interval(3, 1, TimeUnit.SECONDS)
                // 该例子发送的事件序列特点:延迟3s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        mDisposable = d;
                        // 默认最先调用复写的 onSubscribe()
                        Log.e(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.e(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "对Complete事件作出响应");
                    }
                });
        //使用简便的Consumer观察者,返回值Disposable用于停止任务
        mDisposable = Observable.interval(3, 1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {

            }
        });
11-12 14:33:22.057 E: 开始采用subscribe连接
11-12 14:33:25.060 E: 接收到了事件0
11-12 14:33:26.060 E: 接收到了事件1
11-12 14:33:27.060 E: 接收到了事件2
11-12 14:33:28.060 E: 接收到了事件3
11-12 14:33:29.060 E: 接收到了事件4
...
  @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null && !mDisposable.isDisposed()) {
            mDisposable.dispose();
        }
    }

7、defer()操作符

//1. 第1次对i赋值
private Integer i = 10;

        // 2. 通过defer 定义被观察者对象
        // 注:此时被观察者对象还没创建
        Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> call() throws Exception {
                return Observable.just(i);
            }
        });
        //2. 第2次对i赋值
        i = 15;
        //3. 观察者开始订阅
        // 注:此时,才会调用defer()创建被观察者对象(Observable)
        observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "接收到的整数是" + integer);
            }
        });
11-12 15:06:38.161 E: 接收到的整数是15

变换操作符

1、map()操作符

Observable.just(1, 2, 3).map(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e(TAG, "accept : " + s + "\n");
            }
        });
E: accept : This is result 1
E: accept : This is result 2
E: accept : This is result 3

2、flatMap()操作符

        // 采用RxJava基于事件流的链式操作,也就是:对象.XXX()方法
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        })
                // 采用flatMap()变换操作符
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        final List<String> list = new ArrayList<>();
                        for (int i = 0; i < 3; i++) {
                            list.add("我是事件 " + integer + "拆分后的子事件" + i);
                            // 通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件的事件序列
                            // 最终合并,再发送给观察者
                        }
                        //合并之后发射事件加个延时,不然执行速度太快可能很难出现无序的现象
                        int delayTime = (int) (1 + Math.random() * 10);
                        return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, s);
                    }
                });
我是事件 1拆分后的子事件0
我是事件 3拆分后的子事件0
我是事件 3拆分后的子事件1
我是事件 3拆分后的子事件2
我是事件 1拆分后的子事件1
我是事件 1拆分后的子事件2
我是事件 2拆分后的子事件0
我是事件 2拆分后的子事件1
我是事件 2拆分后的子事件2

3、concatMap()操作符

我是事件 1拆分后的子事件0
我是事件 1拆分后的子事件1
我是事件 1拆分后的子事件2
我是事件 2拆分后的子事件0
我是事件 2拆分后的子事件1
我是事件 2拆分后的子事件2
我是事件 3拆分后的子事件0
我是事件 3拆分后的子事件1
我是事件 3拆分后的子事件2

新合并生成的事件序列顺序是有序的,即严格按照旧序列发送事件的顺序,可能有时候这样打印出来日志结果还是无序的,这个原因是因为我们加了延时导致日志打印顺序变了导致的,忽略就好了。这里就是记住是有序的就好了。

4、buffer()操作符

        Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                /*参数解释:
                  count:缓存区大小 = 每次从被观察者中获取的事件数量
                  skip:步长 = 每次获取新事件时跳过事件的数量*/
                .buffer(3, 4) //在事件足够的时候每次取count个值,每次跳过skip个事件
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(@NonNull List<Integer> integers) throws Exception {
                        Log.e(TAG, "buffer size : " + integers.size() + "\n");
                        Log.e(TAG, "buffer value : ");
                        for (Integer i : integers) {
                            Log.e(TAG, i + "");
                        }
                        Log.e(TAG, "\n");
                    }
                });
buffer size : 3
buffer value : 
1
2
3
buffer size : 3
buffer value : 
5
6
7
buffer size : 2
buffer value : 
9
10

组合/合并操作符

组合被观察者数量≤4个,如果要组合大于4个的观察者用concatArray()操作符,用法是一样的这里对concatArray操作符就略过了

1、concat()操作符

Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "concat : "+ integer + "\n" );
                    }
                });
concat : 1
concat : 2
concat : 3
concat : 4
concat : 5
concat : 6

2、merge()操作符

        // merge():组合多个被观察者(<4个)一起发送数据
        // 注:合并后按照时间线并行执行
        Observable.merge(
                /*intervalRange操作符参数:
                 * long start:发射事件起始值
                 * long count:发射事件数量
                 * long initialDelay:初始发射延时
                 * long period:反射周期
                 * */
                // 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
                // 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

日志输出结果:两个被观察者发送事件并行执行,输出结果 = 0,2 -> 1,3 -> 2,4


merge操作符.gif

3、zip()操作符

        //需要被合并的第一个被观察者:发射字符串类型事件
        Observable<String> stringObservable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("A");
                    Log.e(TAG, "String emit : A \n");
                    e.onNext("B");
                    Log.e(TAG, "String emit : B \n");
                    e.onNext("C");
                    Log.e(TAG, "String emit : C \n");
                }
            }
        });

        //需要被合并的第二个被观察者:发射整型事件
        Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext(1);
                    Log.e(TAG, "Integer emit : 1 \n");
                    e.onNext(2);
                    Log.e(TAG, "Integer emit : 2 \n");
                    e.onNext(3);
                    Log.e(TAG, "Integer emit : 3 \n");
                    e.onNext(4);
                    Log.e(TAG, "Integer emit : 4 \n");
                    e.onNext(5);
                    Log.e(TAG, "Integer emit : 5 \n");
                }
            }
        });
        
        Observable.zip(stringObservable, integerObservable, new BiFunction<String, Integer, String>() {
            @Override
            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                return s + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e(TAG, "zip : accept : " + s + "\n");
            }
        });
String emit : A 
String emit : B 
String emit : C 
zip : accept : A1
Integer emit : 1 
zip : accept : B2
Integer emit : 2 
zip : accept : C3
Integer emit : 3 
Integer emit : 4 
Integer emit : 5 
  • zip 组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,组合的顺序是严格按照事件发送的顺序来进行的,所以上面截图中,可以看到,1 永远是和 A 结合的,2 永远是和 B 结合的。
  • 最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同,上面就是stringObservable发射器。

4、reduce()操作符

Observable.just(1, 2, 3, 4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    // 在该复写方法中复写聚合的逻辑
                    @Override
                    public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
                        Log.e(TAG, "本次计算的数据是:" + s1 + " 乘 " + s2);
                        return s1 * s2;
                        // 本次聚合的逻辑是:全部数据相乘起来
                        // 原理:第1次取前2个数据相乘,之后每次获取到的数据 = 返回的数据 * 原始下1个数据每
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer s) throws Exception {
                Log.e(TAG, "最终计算的结果是: " + s);
            }
        });
本次计算的数据是:1 乘 2
本次计算的数据是:2 乘 3
本次计算的数据是:6 乘 4
最终计算的结果是: 24

功能操作符

1、subscribe()操作符

2、subscribeOn()操作符

类型 含义 应用场景
Schedulers.immediate() 当前线程 = 不指定线程 默认
AndroidSchedulers.mainThread() Android主线程 操作UI
Schedulers.newThread() 常规新线程 耗时等操作
Schedulers.io() io操作线程 网络请求、读写文件等io密集型操作
Schedulers.computation() CPU计算操作线程 大量计算操作

3、observeOn()操作符

Observable.just(1, 2, 3).subscribeOn(Schedulers.newThread()) //指定被观察者工作线程为子线程
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception { //这里没指定线程会延续被观察者的子线程

                    }
                })
                .observeOn(Schedulers.io()) //指定观察者工作线程为io线程
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception { //io线程
                        return null;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread()) //切换到UI线程
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception { //UI线程

                    }
                });

4、delay()操作符

// 1. 指定延迟时间
// 参数1 = 时间;参数2 = 时间单位
delay(long delay,TimeUnit unit)

// 2. 指定延迟时间 & 调度器
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器
delay(long delay,TimeUnit unit,mScheduler scheduler)

// 3. 指定延迟时间  & 错误延迟
// 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常
// 参数1 = 时间;参数2 = 时间单位;参数3 = 错误延迟参数
delay(long delay,TimeUnit unit,boolean delayError)

// 4. 指定延迟时间 & 调度器 & 错误延迟
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器;参数4 = 错误延迟参数
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟
Observable.just(1, 2, 3)
                .delay(3, TimeUnit.SECONDS) // 延迟3s再发送(onNext(1)、onNext(2)、onNext(3)),其它重载方法由于使用类似,所以此处不作全部展示
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                    }
                });

5、retry()操作符

  1. 接收到 onError()时,重新订阅 & 发送事件
  2. Throwable 和 Exception都可拦截
<-- 1. retry() -->
// 作用:出现错误时,让被观察者重新发送数据
// 注:若一直错误,则一直重新发送

<-- 2. retry(long time) -->
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制
// 参数 = 重试次数
 
<-- 3. retry(Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
// 参数 =  判断逻辑(传入当前重试次数 & 异常错误信息)

<-- 5. retry(long time,Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制
// 参数 = 设置重试次数 & 判断逻辑
//retry()
        // 作用:出现错误时,让被观察者重新发送数据
        // 注:若一直错误,则一直重新发送
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        })
                .retry() // 遇到错误时,让被观察者重新发射数据(若一直错误,则一直重新发送
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.e(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "对Complete事件作出响应");
                    }
                });
接收到了事件1
接收到了事件2
接收到了事件1
接收到了事件2
...

过滤操作符

1、take()操作符

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // 1. 发送5个事件
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
            }

            // 采用take()变换操作符
            // 指定了观察者只能接收2个事件
        }).take(2)
        .subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "开始采用subscribe连接");
            }

            @Override
            public void onNext(Integer value) {
                Log.e(TAG, "过滤后得到的事件是:"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "对Complete事件作出响应");
            }
        });

// 实际上,可理解为:被观察者还是发送了5个事件,只是因为操作符的存在拦截了3个事件,最终观察者接收到的是2个事件
开始采用subscribe连接
过滤后得到的事件是:1
过滤后得到的事件是:2
对Complete事件作出响应

条件/布尔操作符

背压

结语

最近的博客可能都会有些显得匆忙,里面有些部分甚至都没有写完(之后有空会来完善)。这当中主要原因是自己为了达成今年共50篇博客的目标,加之有些博客的知识点确实太多了,要写全的话时间就来不及了,所以就先大体写下,缺的那点其实是无伤大雅的,你要认真看下来收获还是挺大的。

感谢

Carson_Ho的一系列关于RxJava的文章
这可能是最好的 RxJava 2.x 入门教程(完结版)

上一篇 下一篇

猜你喜欢

热点阅读