RxAndroid常用操作符

2019-03-18  本文已影响0人  Sean1225

RxAndroid入门一文中,我们可以知道,RxJava主要构建了一个主要应用于异步场景、通过观察者模式及使用响应式编程和函数式编程规范实现的生产者-消费者模型:

  1. 事件生产:生产者是ObservableObservable对象的创建除了最基础的create方法之外,还可以使用justfromzip等其他多个方法。RxJava中提供了大量创建Observable的工厂方法,按需取用。
  2. 事件消费:消费者是Observer及其拓展,使用subcribe关联生产者。
  3. 事件加工:这是我们在RxAndroid入门一文中没有提到的,RxJava提供了大量的方法(如mapfilter)对事件进行加工。

事件生产和加工的方法一般也被称为操作符,RxJava提供的大量操作符不可能全部记住,因此只需要记住它们的作用,具体应用时再查阅代码或相关资料。接着,我们来看下一些常见的操作符。

创建Observable

create

方法原型如下:

static <T> Observable<T> create(ObservableOnSubscribe<T> source);

这个我们在入门时已经使用过了,它是创建Observable对象最基础的方法,需要实现ObservableOnSubscribe接口,然后在ObservableOnSubscribe.subscribe中生产和交付事件。

just

justcreate的简化操作,它直接将需要由Emitter发射出去的数据在初始化时传入,有Observer订阅后,Observable会自动依次发射这些数据并最后调用onComplete。示例如下:

Observable.just(1, 2, 3).subscribe(
        (Integer i) -> log("onNext:" + i),
        (Throwable e) -> e.printStackTrace(),
        () -> log("onComplete"));

输出日志如下:

onNext:1
onNext:2
onNext:3
onComplete

amb

方法原型如下:

static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources);
static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources);

所有Observable中,只有最早生产出事件的那个Observable的事件能够被Observer消费。即消费者选定最优的生产者,抛弃其他生产者,判定条件是事件的生产顺序。

Observable.ambArray(
        Observable.create((ObservableEmitter<String> emitter) -> emitter.onNext("1")).delay(1, TimeUnit.SECONDS),
        Observable.create((ObservableEmitter<String> emitter) -> emitter.onNext("2"))
).subscribe((String s) -> log("onNext:" + s));

上述例子中的日志输出结果是onNext:2,如果删除第一个Observabledelay调用,那么输出就是onNext:1。此外,还有一个成员方法abmWith可以在创建出Observable之后再使用,用来动态添加source。

使用场景:

同时向多个ip发起请求,最快响应的那个服务器将作为后续访问的节点。

concat

方法原型如下(方法有点多,参数数量不一样但功能完全一样的只列出其中一个):

static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources);
static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch);
static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources);
static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources);
static <T> Observable<T> concatArrayEager(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources);
static <T> Observable<T> concatArrayEagerDelayError(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources);
static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch, boolean tillTheEnd);
static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch);

使用concat连接的所有Observable会串行执行,当上一个Observable执行结束后,即触发了onCompleteonError(如果可能触发onError记得设置异常处理器)后,下一个Observable才开始执行(是否立即执行取决于是否设置了delay)。还记得入门时讲解响应式编程中提到的例子吗(不记得的话再翻一下),用RxJava实现如下:

Observable.concat(
        Observable.create((ObservableEmitter<Runnable> emitter) -> {
            emitter.onNext(new TaskA());
            emitter.onComplete();
        }),
        Observable.create((ObservableEmitter<Runnable> emitter) -> {
            emitter.onNext(new TaskB());
            emitter.onComplete();
        }).delay(1, TimeUnit.SECONDS),
        Observable.create((ObservableEmitter<Runnable> emitter) -> {
            emitter.onNext(new TaskC());
            emitter.onComplete();
        }).delay(1, TimeUnit.SECONDS)
).subscribe((Runnable runnable) -> runnable.run(), (Throwable e) -> e.printStackTrace());

应用场景:
concat的应用场景非常广,实际项目中,任务(或业务)之间的线性依赖关系很普遍。

merge

mergeconcat用法类似(方法原型这里就不贴了),差别在于concat是串行的,而merge是并行的。所有Observable并行运行,直到它们全部触发onComplete后,Observer才会触发onComplete,示例如下:

Observable.merge(
        Observable.create((ObservableEmitter<String> emitter) -> {
            emitter.onNext("A");
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
            }
            emitter.onNext("B");
            emitter.onComplete();
        }).subscribeOn(Schedulers.newThread()),
        Observable.create((ObservableEmitter<String> emitter) -> {
            try {
                Thread.sleep(500);
            } catch (Exception e) {
            }
            emitter.onNext("5");
            emitter.onComplete();
        }).subscribeOn(Schedulers.newThread())
).subscribe(
        (String s) -> log("onNext:" + s),
        (Throwable e) -> e.printStackTrace(),
        () -> log("onComplete"));

日志输出如下:

onNext:A
onNext:5
onNext:B
onComplete

从日志中可以看到2个Observable是并行运行的(需要手动设置不同的调度线程),并且Observer只回调了一次onComplete,且是在2个Observable都触发了onComplete之后回调。

使用场景:

mergeconcat应该是使用场景最广泛的两种操作了。当一个任务(假设为C)依赖于其它多个任务时(假设为A、B),而A、B之间又没有相互依赖关系,为了保证效率,A、B显然需要并行运行,等到A、B都运行结束了就运行C。

zip

方法原型如下(方法有点多,参数数量不一样但功能完全一样的只列出其中一个):

static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(
            ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, ObservableSource<? extends T3> source3,
            ObservableSource<? extends T4> source4, ObservableSource<? extends T5> source5, ObservableSource<? extends T6> source6,
            ObservableSource<? extends T7> source7, ObservableSource<? extends T8> source8, ObservableSource<? extends T9> source9,
            Function9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipper);
static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper,
            boolean delayError, int bufferSize, ObservableSource<? extends T>... sources);
static <T, R> Observable<R> zipIterable(Iterable<? extends ObservableSource<? extends T>> sources,
            Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize);     

zip将多个Observable的事件组合在一起,然后再依次发射。这话不是很容易理解,我们先来看下例子:

Observable.zip(
        Observable.create((ObservableEmitter<String> emitter) -> {
            emitter.onNext("A");
            emitter.onNext("B");
            emitter.onNext("C");
        }),
        Observable.create((ObservableEmitter<Integer> emitter) -> {
            emitter.onNext(1);
            emitter.onNext(2);
        }),
        (String t1, Integer t2) -> new Pair<String, Integer>(t1, t2)
).subscribe((Pair<String, Integer> result) -> log("onNext:" + result.first + result.second));

日志输出如下:

onNext:A1
onNext:B2

这里我们使用的是3个参数的zip方法,前面2个参数传入Observable实例,以下简称O1和O2,第3个参数传入BiFunction实例。BiFunction只有一个apply方法,apply方法拦截O1、O2的事件发射,参数1表示O1发射的事件,参数2表示O2发射的事件,返回值是两个事件的打包(打包类型自已定),apply方法的作用就是进行事件打包。打包后的事件会依次发射给Observer,且打包必须依次一一对应,如果一方发射了m个事件,另一个方只发射了n个事件(n<m),那么最终发射给Observer的事件将只有n个。

事件加工

上面列举了创建Observable的几种方式,事实上远不止以上几种,但以上几种是比较常见,接下来我们来看下常见的事件加工。

filter

filter是事件加工中最简易懂的操作了,它拦截Observable发射出来的事件,并将其中不符合要求的事件滤掉,只发射满足要求的事件给Observer。看个例子:

Observable.create((ObservableEmitter<Integer> emitter) -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
}).filter((Integer i) -> i > 1)
        .subscribe((Integer i) -> log("onNext:" + i));

日志输出如下:

onNext:2
onNext:3

map

map拦截Observable发射出来的事件,将事件转化为其他数据后再发射给Observer,示例如下:

Observable.create((ObservableEmitter<Integer> emitter) -> {
    emitter.onNext(1);
    emitter.onNext(2);
}).map((Integer i) -> "this is " + i)
        .subscribe((String s) -> log("onNext:" + s));

日志输出如下:

onNext:this is 1
onNext:this is 2

flatMap

flatMapmap类似,都是进行数据的转化,差别在于flatMap将数据转化为Observable对象,这些Observable取代原有的Observable作为生产者提供数据,使用map中的示例修改后代码如下:

Observable.just(1, 2)
        .flatMap((Integer i) -> Observable.just("this is " + i))
        .subscribe((String s) -> log("onNext:" + s));

日志输出如下:

onNext:this is 1
onNext:this is 2

仅通过上面的例子很难了解flatMap的用处,因为map也可以做到,其实flatMap适合处理更为复杂的数据,如多重列表。假设有一群小朋友聚在一起玩耍,每个人都需要拿出自己的玩具,然后我们需要统计下这些玩具,代码如下:

class Toy {
    String name;
    Toy(String name) {
        this.name = name;
    }
}

class Kid {
    List<Toy> toys;
}

List<Kid> kids() {
    List<Kid> kids = new LinkedList<>();
    Kid kid = new Kid();
    kid.toys = new LinkedList<>();
    kid.toys.add(new Toy("熊大"));
    kid.toys.add(new Toy("熊二"));
    kids.add(kid);
    kid = new Kid();
    kid.toys = new LinkedList<>();
    kid.toys.add(new Toy("水枪"));
    kids.add(kid);
    kid = new Kid();
    kid.toys = new LinkedList<>();
    kid.toys.add(new Toy("足球"));
    kid.toys.add(new Toy("扭扭车"));
    kids.add(kid);
    return kids;
}

void test() {
    Observable.fromIterable(kids())
            .flatMap((Kid kid) -> Observable.fromIterable(kid.toys))
            .subscribe((Toy toy) -> log("onNext:" + toy.name));
}

执行test方法输出日志如下:

onNext:熊大
onNext:熊二
onNext:水枪
onNext:足球
onNext:扭扭车

concatMap

concatMapflatMap的作用类似,差别在于前者输出的数据顺序与原始数据保持一致(使用concat),而后者不保证(使用merge),我们来看个例子:

Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5)
        .flatMap((Integer i) -> {
            if(i == 3) {
                return Observable.just(i).delay(1, TimeUnit.SECONDS);
            } else {
                return Observable.just(i);
            }
        });
Observable<Integer> o2 = Observable.just(-1, -2, -3, -4, -5)
        .concatMap((Integer i) -> {
            if(i == -3) {
                return Observable.just(i).delay(1, TimeUnit.SECONDS);
            } else {
                return Observable.just(i);
            }
        });
Observable.concat(o1, o2).subscribe((Integer i) -> log("onNext:" + i));

日志输出如下:

onNext:1
onNext:2
onNext:4
onNext:5
onNext:3
onNext:-1
onNext:-2
onNext:-3
onNext:-4
onNext:-5
上一篇下一篇

猜你喜欢

热点阅读