Rxjava2 Observable的结合操作详解及实例

2019-12-30  本文已影响0人  JiangMing_JIM

简要:

需求了解:

在使用 RxJava 开发的过程中,很多时候需要结合多个条件或者数据的逻辑判断,比如登录功能的表单验证,实时数据比对等。这个时候我们就需要使用 RxJava 的结合操作符来完成这一需求,Rx中提供了丰富的结合操作处理的操作方法。

可用于组合多个Observables的操作方法:

1. CombineLatest

当 Observables 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据。

CombineLatest 操作符行为类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时 zip 才发射数据。 CombineLatest 则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时, CombineLatest 使用一 个函数结合它们最近发射的数据,然后发射这个函数的返回值。

img-CombineLatest
解析: combineLatest 操作符可以结合多个Observable,可以接收 2-9 个Observable对象, 在其中原始Observables的任何一个发射了一条数据时, CombineLatest 使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。此外combineLatest 操作符还有一些接收 Iterable , 数组方式的变体,以及其他指定参数combiner、bufferSize、和combineLatestDelayError方法等变体,在此就不在详细展开了,有兴趣的可以查看官方的相关API文档了解。

实例代码:

    // Observables 创建
    Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
    Observable<Long> observable2 = Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS);
    Observable<Long> observable3 = Observable.intervalRange(100, 5, 1, 1, TimeUnit.SECONDS);
    
    // 1. combineLatest(ObservableSource, ObservableSource [支持2-9个参数]...,  BiFunction)
    // 结合多个Observable, 当他们其中任意一个发射了数据时,使用函数结合他们最近发射的一项数据
    Observable.combineLatest(observable1, observable2, new BiFunction<Long, Long, String>() {
    
        @Override
        public String apply(Long t1, Long t2) throws Exception {
            System.out.println("--> apply(1) t1 = " + t1 + ", t2 = " + t2);
            if (t1 + t2 == 10) {
                return "Success";   // 满足一定条件,返回指定的字符串
            }
            return t1 + t2 + "";    // 计算所有数据的和并转换为字符串
        }
    }).subscribe(new Consumer<String>() {
    
        @Override
        public void accept(String t) throws Exception {
            System.out.println("----> accept combineLatest(1): " + t);
        }
    });
    
    System.out.println("--------------------------------------------------------");
    // 2. combineLatest(T1, T2, T3, Function)
    // Observables的结合
    Observable.combineLatest(observable1, observable2, observable3, new Function3<Long, Long, Long, String>() {
        @Override
        public String apply(Long t1, Long t2, Long t3) throws Exception {
            System.out.println("--> apply(2): t1 = " + t1 + ", t2 = " + t2 + ", t3 = " + t3);
            return t1 + t2 + t3 + "";   // 计算所有数据的和并转换为字符串
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String t) throws Exception {
            System.out.println("--> accept(2): " + t);
        }
    });

输出:

--> apply(1) t1 = 1, t2 = 1
----> accept combineLatest(1): 2
--> apply(1) t1 = 2, t2 = 1
----> accept combineLatest(1): 3
--> apply(1) t1 = 3, t2 = 1
----> accept combineLatest(1): 4
--> apply(1) t1 = 3, t2 = 2
----> accept combineLatest(1): 5
--> apply(1) t1 = 4, t2 = 2
----> accept combineLatest(1): 6
--> apply(1) t1 = 4, t2 = 3
----> accept combineLatest(1): 7
--> apply(1) t1 = 5, t2 = 3
----> accept combineLatest(1): 8
--> apply(1) t1 = 5, t2 = 4
----> accept combineLatest(1): 9
--> apply(1) t1 = 5, t2 = 5
----> accept combineLatest(1): Success
--------------------------------------------------------
--> apply(2): t1 = 1, t2 = 1, t3 = 100
--> accept(2): 102
--> apply(2): t1 = 2, t2 = 1, t3 = 100
--> accept(2): 103
--> apply(2): t1 = 2, t2 = 1, t3 = 101
--> accept(2): 104
--> apply(2): t1 = 2, t2 = 2, t3 = 101
--> accept(2): 105
--> apply(2): t1 = 3, t2 = 2, t3 = 101
--> accept(2): 106
--> apply(2): t1 = 3, t2 = 2, t3 = 102
--> accept(2): 107
--> apply(2): t1 = 4, t2 = 2, t3 = 102
--> accept(2): 108
--> apply(2): t1 = 4, t2 = 2, t3 = 103
--> accept(2): 109
--> apply(2): t1 = 5, t2 = 2, t3 = 103
--> accept(2): 110
--> apply(2): t1 = 5, t2 = 3, t3 = 103
--> accept(2): 111
--> apply(2): t1 = 5, t2 = 3, t3 = 104
--> accept(2): 112
--> apply(2): t1 = 5, t2 = 4, t3 = 104
--> accept(2): 113
--> apply(2): t1 = 5, t2 = 5, t3 = 104
--> accept(2): 114

Javadoc: combineLatest(T1, T2, T3... , T9, combiner)

2. Join

任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。

img-join
Join 操作符结合两个Observable发射的数据,基于时间窗口(你定义的针对每条数据特定的原则)选择待集合的数据项。你将这些时间窗口实现为一些Observables,它们的生命周期从任何一条Observable发射的每一条数据开始。当这个定义时间窗口的Observable发射了一条数据或者完成时,与这条数据关联的窗口也会关闭。只要这条数据的窗口是打开的,它将继续结合其它Observable发射的任何数据项。你定义一个用于结合数据的函数。

解析: join(other, leftEnd, rightEnd, resultSelector) 相关参数的解析

注意: 这是源Observable和目标Observable发射数据在任意一个基于时间窗口的有效期内才会接收到组合数据,这就意味着可能有数据丢失的情况,在其中一个已经发射完所有数据,并且没有处于时间窗口的数据情况,另一个Observable的数据发射将不会收到组合数据。

示例代码:

    // Observable的创建
    Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);

    // 1. join(other, leftEnd, rightEnd, resultSelector)
    // other: 目标组合的Observable
    // leftEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是源Observable发射数据的有效期
    // rightEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是目标Observable发射数据的有效期
    // resultSelector: 接收源Observable和目标Observable发射的数据项, 处理后的数据返回给观察者对象
    sourceObservable.join(targetObservable, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t1 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 源Observable发射数据的有效期为1000毫秒
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t2 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 目标Observable发射数据的有效期为1000毫秒
        }
    }, new BiFunction<Long, Long, String>() {
        @Override
        public String apply(Long t1, Long t2) throws Exception {
            return "t1 = " + t1 + ", t2 = " + t2;                   // 对数据进行组合后返回和观察者
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String t) throws Exception {
            System.out.println("--> accept(1): " + t);
        }
    });

    System.in.read();

输出:

-----> t1 is emitter: 1
-----> t2 is emitter: 10
--> accept(1): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> accept(1): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> accept(1): t1 = 3, t2 = 10
-----> t2 is emitter: 11
--> accept(1): t1 = 1, t2 = 11
--> accept(1): t1 = 2, t2 = 11
--> accept(1): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> accept(1): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> accept(1): t1 = 5, t2 = 11
-----> t2 is emitter: 12
--> accept(1): t1 = 3, t2 = 12
--> accept(1): t1 = 4, t2 = 12
--> accept(1): t1 = 5, t2 = 12
-----> t2 is emitter: 13
--> accept(1): t1 = 5, t2 = 13
-----> t2 is emitter: 14   // 此时源t1中已经没有数据还处于时间窗口有效期内

Javadoc: join(other, leftEnd, rightEnd, resultSelector)

groupJoin

groupJoin 操作符与 join 相同,只是参数传递有所区别。groupJoin(other, leftEnd, rightEnd, resultSelector) 中的resultSelector 可以将原始数据转换为 Observable 类型的数据发送给观察者。

img-groupJoin

示例代码:

    // Observable的创建
    Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);

    // 2. groupJoin(other, leftEnd, rightEnd, resultSelector)
    // groupJoin操作符与join相同,只是参数传递有所区别。
    // resultSelector可以将原始数据转换为Observable类型的数据发送给观察者。
    sourceObservable.groupJoin(targetObservable, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t1 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 源Observable发射数据的有效期为1000毫秒
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t2 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 目标Observable发射数据的有效期为1000毫秒
        }
    }, new BiFunction<Long, Observable<Long>, Observable<String>>() {
        @Override
        public Observable<String> apply(Long t1, Observable<Long> t2) throws Exception {
            System.out.println("--> apply(2) combine: " + t1);            // 结合操作
            return t2.map(new Function<Long, String>() {
                @Override
                public String apply(Long t) throws Exception {
                    System.out.println("-----> apply(2) operation: " + t);
                    return "t1 = " + t1 + ", t2 = " + t;
                }
            });
        }
    }).subscribe(new Consumer<Observable<String>>() {
        @Override
        public void accept(Observable<String> stringObservable) throws Exception {
            stringObservable.subscribe(new Consumer<String>() {
                @Override
                public void accept(String t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            });
        }
    });

输出:

-----> t1 is emitter: 1
--> apply(2) combine: 1
-----> t2 is emitter: 10
-----> apply(2) operation: 10
--> accept(2): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> apply(2) combine: 2
-----> apply(2) operation: 10
--> accept(2): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> apply(2) combine: 3
-----> apply(2) operation: 10
--> accept(2): t1 = 3, t2 = 10
-----> t2 is emitter: 11
-----> apply(2) operation: 11
--> accept(2): t1 = 1, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 2, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> apply(2) combine: 4
-----> apply(2) operation: 11
--> accept(2): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> apply(2) combine: 5
-----> apply(2) operation: 11
--> accept(2): t1 = 5, t2 = 11
-----> t2 is emitter: 12
-----> apply(2) operation: 12
--> accept(2): t1 = 3, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 4, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 5, t2 = 12
-----> t2 is emitter: 13
-----> apply(2) operation: 13
--> accept(2): t1 = 5, t2 = 13
-----> t2 is emitter: 14

Javadoc: groupJoin(other, leftEnd, rightEnd, resultSelector)

3. Merge

合并多个Observables的发射物。

img-Merge

使用 Merge 操作符你可以将多个Observables的输出合并,就好像它们是一个单个的 Observable 一样。

3.1 merge

Merge 可能会让合并的Observables发射的数据交错(有一个类似的操作符 Concat 不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物),任何一个原始Observable的 onError 通知会被立即传递给观察者,而且会终止合并后的Observable。

img-merge

除了传递多个Observable给 merge ,你还可以传递一个Observable列表 List ,数组,甚至是一个发射Observable序列的Observable, merge 将合并它们的输出作为单个Observable的输出。

img-merge-observables

如果你传递一个发射Observables序列的Observable,你可以指定 merge 应该同时订阅的 Observable 的最大数量。一旦达到订阅数的限制,它将不再订阅原始Observable发射的任何其它Observable,直到某个已经订阅的Observable发射了 onCompleted 通知。

示例代码:

    // 创建Observable对象
    Observable<Integer> odd = Observable.just(1, 3, 5);
    Observable<Integer> even = Observable.just(2, 4, 6);
    Observable<Integer> big = Observable.just(188888, 688888, 888888);

    // 创建list对象
    List<Observable<Integer>> list = new ArrayList<>();
    list.add(odd);
    list.add(even);
    list.add(big);

    // 创建Array对象
    Observable<Integer>[] observables = new Observable[3];
    observables[0] = odd;
    observables[1] = even;
    observables[2] = big;

    // 创建发射Observable序列的Observable
    Observable<ObservableSource<Integer>> sources = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {
        @Override
        public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
            emitter.onNext(Observable.just(1));
            emitter.onNext(Observable.just(1, 2));
            emitter.onNext(Observable.just(1, 2, 3));
            emitter.onNext(Observable.just(1, 2, 3, 4));
            emitter.onNext(Observable.just(1, 2, 3, 4, 5));
            emitter.onComplete();
        }
    });

    // 1. merge(ObservableSource source1, ObservableSource source2, ..., ObservableSource source4)
    // 可接受 2-4 个Observable对象进行merge
    Observable.merge(odd, even)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 2. merge(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int bufferSize)
    // 可选参数, maxConcurrency: 最大的并发处理数, bufferSize: 缓存的数量(从每个内部观察资源预取的项数)
    // 接受一个Observable的列表List
    Observable.merge(list)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 3. mergeArray(int maxConcurrency, int bufferSize, ObservableSource<? extends T>... sources)
    // 可选参数, maxConcurrency: 最大的并发处理数, bufferSize: 缓存的数量(从每个内部观察资源预取的项数)
    // 接受一个Observable的数组Array
    Observable.mergeArray(observables)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 4. merge(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency)
    // 可选参数, maxConcurrency: 最大的并发处理数
    // 接受一个发射Observable序列的Observable
    Observable.merge(sources)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(4): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 5. mergeWith(other)
    // merge 是静态方法, mergeWith 是对象方法: Observable.merge(odd,even) 等价于 odd.mergeWith(even)
    odd.mergeWith(even)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(5): " + integer);
                }
            });

输出:

--> accept(1): 1
--> accept(1): 3
--> accept(1): 5
--> accept(1): 2
--> accept(1): 4
--> accept(1): 6
-----------------------------------------------
--> accept(2): 1
--> accept(2): 3
--> accept(2): 5
--> accept(2): 2
--> accept(2): 4
--> accept(2): 6
--> accept(2): 188888
--> accept(2): 688888
--> accept(2): 888888
-----------------------------------------------
--> accept(3): 1
--> accept(3): 3
--> accept(3): 5
--> accept(3): 2
--> accept(3): 4
--> accept(3): 6
--> accept(3): 188888
--> accept(3): 688888
--> accept(3): 888888
-----------------------------------------------
--> accept(4): 1
--> accept(4): 1
--> accept(4): 2
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 5
-----------------------------------------------
--> accept(5): 1
--> accept(5): 3
--> accept(5): 5
--> accept(5): 2
--> accept(5): 4
--> accept(5): 6

Javadoc: merge(source1, ... , source4)
Javadoc: merge(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArray(int maxConcurrency, int bufferSize, ObservableSource... sources)
Javadoc: merge(ObservableSource<ObservableSource> sources, int maxConcurrency)

3.2 mergeDelayError

如果传递给 merge 的任何一个的Observable发射了 onError通知终止了, merge 操作符生成的Observable也会立即以onError通知终止。如果你想让它继续发射数据,在最后才报告错误,可以使用 mergeDelayError

img-mergeDelayError

MergeDelayError 操作符,mergeDelayError 在合并与交错输出的使用上与 merge 相同,区别在于它会保留 onError 通知直到其他没有Error的Observable所有的数据发射完成,在那时它才会把onError传递给观察者。

注意: 如果有多个原始Observable出现了Error, 这些Error通知会被合并成一个 CompositeException ,保留在CompositeException 内部的 List<Throwable> exceptions 中,但是如果只有一个原始Observable出现了Error,则不会生成 CompositeException ,只会发送这个Error通知。

由于MergeDelayError使用上和merge相同 ,所以这里就不做详细分析了,这里就简单描述其中的一种的使用实例。

实例代码:

    // 创建有Error的Observable序列的Observable
    Observable<ObservableSource<Integer>> DelayErrorObservable = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {

        @Override
        public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
            emitter.onNext(Observable.just(1));
            emitter.onNext(Observable.error(new Exception("Error Test1"))); // 发射一个Error的通知的Observable
            emitter.onNext(Observable.just(2, 3));
            emitter.onNext(Observable.error(new Exception("Error Test2"))); // 发射一个Error的通知的Observable
            emitter.onNext(Observable.just(4, 5, 6));
            emitter.onComplete();
        }
    });

    // 6. mergeDelayError
    // 保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者
    Observable.mergeDelayError(DelayErrorObservable)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(6)");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext(6): " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    // 判断是否是CompositeException对象(发生多个Observable出现Error时会发送的对象)
                    if (e instanceof CompositeException) {
                        CompositeException compositeException = (CompositeException) e;
                        List<Throwable> exceptions = compositeException.getExceptions();
                        System.out.println("--> onError(6): " + exceptions);
                    } else {
                        System.out.println("--> onError(6): " + e);
                    }
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(6)");
                }
            });

输出:

--> onSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> onNext(6): 3
--> onNext(6): 4
--> onNext(6): 5
--> onNext(6): 6
--> onError(6): [java.lang.Exception: Error Test1, java.lang.Exception: Error Test2]

Javadoc: mergeDelayError(source1, … , source4)
Javadoc: mergeDelayError(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArrayDelayError(int maxConcurrency, int bufferSize, ObservableSource… sources)
Javadoc: mergeDelayError(ObservableSource sources, int maxConcurrency)

4. Zip

通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个 结合体 发射单个数据项。

img-Zip

Zip 操作符与 Merge 类似,都是合并多个Observables的数据,返回一个Obversable,主要不同的是它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。 它只发射与发射数据项最少的那个Observable一样多的数据。

img-Zip-Sources

解析:

  1. Zip 操作符与 Merge 的使用上基本一致,主要不同的是 zip 发射的数据取决于发射数据项最少的那个Observable并且按照严格的顺序去结合数据。
  2. 同样具备静态方法 zip 与对象方法 zipWith,可以传递一个Observable列表 List ,数组,甚至是一个发射Observable序列的Observable。

使用上在此就不做详细的展开了,可参照上面的 Merge 使用方法,下面就针对 zip 的特性实现一个简单的实例。

实例代码:

    // 创建Observable
    Observable<Integer> observable1 = Observable.just(1, 2, 3);
    Observable<Integer> observable2 = Observable.just(1, 2, 3, 4, 5, 6);
    
    // zip(sources)
    // 可接受2-9个参数的Observable,对其进行顺序合并操作,最终合并的数据项取决于最少的数据项的Observable
    Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
        @Override
        public String apply(Integer t1, Integer t2) throws Exception {
            System.out.println("--> apply: t1 = " + t1 + ", t2 = " + t2);
            return t1 + t2 + "";
        }
    }).subscribe(new Consumer<String>() {

        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept: " + s);  // 最终接受observable1全部数据项与observable2相同数量顺序部分数据
        }
    });

输出:

--> apply: t1 = 1, t2 = 1
--> accept: 2
--> apply: t1 = 2, t2 = 2
--> accept: 4
--> apply: t1 = 3, t2 = 3
--> accept: 6

Javadoc: zip( source1, source2, ... , source9, zipper )
Javadoc: zip( Iterable sources, Function zipper )
Javadoc: zipIterable(Iterable<ObservableSource> sources, Function<Object[],R> zipper, boolean delayError, int bufferSize)
Javadoc: zipArray( Function<Object[]> zipper, boolean delayError, int bufferSize, ObservableSource... sources )
Javadoc: zip( ObservableSource<ObservableSource> sources, Function<Object[]> zipper )

5. StartWith

在数据序列的开头插入一条指定的数据项或者数据序列。

img-StartWith
如果你想要一个Observable在发射数据之前先发射一个指定的数据或者数据序列(可以是单个数据、数组、列表,Observable中的数据),可以使 用 StartWith 操作符。(如果你想一个Observable发射的数据末尾追加一个数据序列可以使用 Concat 操作符。) img-StartWith-Items

实例代码:

    // 创建列表List
    List<Integer> lists = new ArrayList<>();
    lists.add(999);
    lists.add(9999);
    lists.add(99999);

    // 创建数组Array
    Integer[] arrays = new Integer[3];
    arrays[0] = 999;
    arrays[1] = 9999;
    arrays[2] = 9999;

    // 1. startWith(item)
    // 在Observable数据发射前发射item数据项
    Observable.just(1, 2, 3)
            .startWith(999)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 2. startWith(Iterable items)
    // 在Observable数据发射前发射items列表中的数据序列
    Observable.just(1, 2, 3)
            .startWith(lists)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 3. startWithArray(items)
    // 在Observable数据发射前发射items数组中的数据序列
    Observable.just(1, 2, 3)
            .startWithArray(arrays)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 4. startWith(ObservableSource other)
    // 在Observable数据发射前发射other中的数据序列
    Observable.just(1, 2, 3)
            .startWith(Observable.just(999, 9999, 99999))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(4): " + integer);
                }
            });

输出:

--> accept(1): 999
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
-----------------------------------------
--> accept(2): 999
--> accept(2): 9999
--> accept(2): 99999
--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
-----------------------------------------
--> accept(3): 999
--> accept(3): 9999
--> accept(3): 9999
--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
-----------------------------------------
--> accept(4): 999
--> accept(4): 9999
--> accept(4): 99999
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3

Javadoc: startWith(item)
Javadoc: startWith(Iterable items)
Javadoc: startWithArray(items)
Javadoc: startWith(ObservableSource other)

6. SwitchOnNext

将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些 Observables最近发射的数据项。

6.1 switchOnNext

switchOnNext 订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个, switchOnNext 发射的这个新Observable并取消订阅前一个发射数据的旧Observable,开始发射最新的Observable发射的数据。

img-switchOnNext

注意: 当原始Observables发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在 后来那个Observable产生之后到它开始发射数据之前的这段时间里,前一个Observable发射 的数据将被丢弃(就像图例上的那个黄色圆圈一样)。

6.2 switchOnNextDelayError

Observables发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,如果Observables中的Observable有 Error 异常,将保留 onError 通知直到其他没有Error的Observable所有的数据发射完成,在那时它才会把 onError 传递给观察者。

img-switchOnNextDelayError

注意: 如果有多个原始Observable出现了Error, 这些Error通知会被合并成一个 CompositeException ,保留在CompositeException 内部的 List<Throwable> exceptions 中,但是如果只有一个原始Observable出现了Error,则不会生成 CompositeException ,只会发送这个Error通知。

实例代码:

    // 创建Observable
    Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);

    // 创建发射Observable序列的Observable
    Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

        @Override
        public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
            emitter.onNext(observable1);
            Thread.sleep(1000);
            // 此时发射一个新的observable2,将会取消订阅observable1
            emitter.onNext(observable2);
            emitter.onComplete();
        }
    });

    // 创建发射含有Error通知的Observable序列的Observable
    Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

        @Override
        public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
            emitter.onNext(observable1);
            emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 发射一个发射Error通知的Observable
            emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 发射一个发射Error通知的Observable
            Thread.sleep(1000);
            // 此时发射一个新的observable2,将会取消订阅observable1
            emitter.onNext(observable2);
            emitter.onComplete();
        }
    });

    // 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
    // 可选参数 bufferSize: 缓存数据项大小
    // 接受一个发射Observable序列的Observable类型的sources,
    // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据
    Observable.switchOnNext(sources)
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.in.read();
    System.out.println("--------------------------------------------------------------------");
    // 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
    // 可选参数 prefetch: 与读取数据项大小
    // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,
    // 保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者
    Observable.switchOnNextDelayError(sourcesError)
            .subscribe(new Observer<Long>() {

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(2)");
                }

                @Override
                public void onNext(Long t) {
                    System.out.println("--> onNext(2): " + t);
                }

                @Override
                public void onError(Throwable e) {
                    // 判断是否是CompositeException对象(发生多个Observable出现Error时会发送的对象)
                    if (e instanceof CompositeException) {
                        CompositeException compositeException = (CompositeException) e;
                        List<Throwable> exceptions = compositeException.getExceptions();
                        System.out.println("--> onError(2): " + exceptions);
                    } else {
                        System.out.println("--> onError(2): " + e);
                    }
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(2)");
                }
            });

    System.in.read();

输出:

--> accept(1): 1
--> accept(1): 2
--> accept(1): 10
--> accept(1): 11
--> accept(1): 12
--> accept(1): 13
--> accept(1): 14
--------------------------------------------------------------------
--> onSubscribe(2)
--> onNext(2): 10
--> onNext(2): 11
--> onNext(2): 12
--> onNext(2): 13
--> onNext(2): 14
--> onError(2): [java.lang.Exception: Error Test1!, java.lang.Exception: Error Test2!]

Javadoc: switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
Javadoc: switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)

小结

Rxjava 的合并操作符能够同时处理多个被观察者,并发送相应的事件通知以及数据。常常应用于多业务合并处理场景,比如表单的联动验证,网络交互性数据的校验等,rxjava的合并操作符能够很好的去实现和处理。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

上一篇 下一篇

猜你喜欢

热点阅读