Android 深入分析Android开发经验谈Android技术知识

RXJava学习笔记(4)

2016-06-22  本文已影响242人  皮球二二

组合操作符

组合操作符一共主要有以下几个:

  1. CombineLatest
  2. Join
  3. Merge
  4. StartWith
  5. Switch
  6. Zip

zip

zip操作符一般可以用在如下场景,

  1. 一个页面有多个接口,我希望等数据全部返回出来之后一并显示
  2. 用户发图片,可能有多张,我希望用户上传完图片之后一并提示
    ......
zip

看图说话,当我们有2个或多个Observables发射的数据项时,zip操作符将严格的按照发射的顺序去将结合这些数据项,并且最终他发射出的数据数量与发射数据项最少的那个Observable的数据数量一样多。

zip操作符可以接受一到九个参数:一个Observable序列,或者一些发射Observable的Observables

例子:

我这边有三个数据源,他们分别在2s\4s\6s吐数据,然后我打算将他们整合在一起

Observable o1=Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        subscriber.onNext("first");
    }
});

Observable o2=Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        subscriber.onNext("second");
    }
});

Observable o3=Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        subscriber.onNext("third");
    }
}).subscribeOn(Schedulers.io());

Observable.zip(o1, o2, o3, new Func3() {
    @Override
    public Object call(Object o, Object o2, Object o3) {
        Log.d("MainActivity", o.toString());
        Log.d("MainActivity", o2.toString());
        Log.d("MainActivity", o3.toString());
        return o.toString()+o2.toString()+o3.toString();
    }}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
    @Override
    public void call(Object o) {
        Log.d("MainActivity", o.toString());
    }
});

最终,这个call将会在三条数据都返回过来后再执行

06-21 05:19:04.276 6306-6416/com.example.clevo.rxjavademo D/MainActivity: first
06-21 05:19:04.276 6306-6416/com.example.clevo.rxjavademo D/MainActivity: second
06-21 05:19:04.276 6306-6416/com.example.clevo.rxjavademo D/MainActivity: third
06-21 05:19:04.276 6306-6306/com.example.clevo.rxjavademo D/MainActivity: firstsecondthird

请看时间点是一致的

如果有多对匹配,那么则顺序打印出匹配项

Observable o4=Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {
        for (int i=0;i<3;i++) {
            try {
                Thread.sleep(i*3000);
                subscriber.onNext("four");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }}).subscribeOn(Schedulers.io());

Observable.zip(Observable.just("1", "2", "3", "4"), o4, new Func2() {
    @Override
    public Object call(Object o, Object o2) {
        //每结合成功一组就触发
        Log.d("MainActivity", o.toString());
        Log.d("MainActivity", o2.toString());
        return o.toString()+o2.toString();
    }}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
    @Override
    public void call(Object o) {
        Log.d("MainActivity", o.toString());
    }
});

看看结果

06-21 05:33:11.689 19242-19264/com.example.clevo.rxjavademo D/MainActivity: 1
06-21 05:33:11.689 19242-19264/com.example.clevo.rxjavademo D/MainActivity: four
06-21 05:33:11.690 19242-19242/com.example.clevo.rxjavademo D/MainActivity: 1four
06-21 05:33:17.690 19242-19264/com.example.clevo.rxjavademo D/MainActivity: 2
06-21 05:33:17.690 19242-19264/com.example.clevo.rxjavademo D/MainActivity: four
06-21 05:33:17.690 19242-19242/com.example.clevo.rxjavademo D/MainActivity: 2four
06-21 05:33:26.691 19242-19264/com.example.clevo.rxjavademo D/MainActivity: 3
06-21 05:33:26.691 19242-19264/com.example.clevo.rxjavademo D/MainActivity: four
06-21 05:33:26.691 19242-19242/com.example.clevo.rxjavademo D/MainActivity: 3four

与zip操作符功能差不多的还有zipWith操作符,他与zip操作符的区别在于zipWith操作符总是接受两个参数,第一个参数是一个Observable或者一个Iterable

o4.zipWith(Observable.just("1", "2", "3", "4"), new Func2() {
    @Override
    public Object call(Object o, Object o2) {
        Log.d("MainActivity", o.toString());
        Log.d("MainActivity", o2.toString());
        return o.toString()+o2.toString();
    }}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
    @Override
    public void call(Object o) {
        Log.d("MainActivity", o.toString());
    }
});

结果与之前一致

CombineLatest

CombineLatest操作符一般可以用在如下场景

  1. 登录页面用户名密码需要同时校验,两个EditText的TextWatcher分别触发不同的Observal,以达到共同校验的效果
CombineLatest

看图说话,CombineLatest操作符与Zip操作符类似,但是区别很明显,ZIp操作符是每一个Observal都发射数据了,才会被结合成一个新的Observal,而CombineLatest是只要之前的Observal被发射过了,那么他会用这条Observal最后的那条数据,重新结合成一个新的Observal。这个从图中应该能够清晰的展现出来。

我把之前的代码改动一下

Observable.combineLatest(o1, Observable.just("1", "2", "3", "4"), new Func2() {
    @Override
    public Object call(Object o, Object o2) {
        Log.d("MainActivity", o.toString());
        Log.d("MainActivity", o2.toString());
        return o.toString()+o2.toString();
    }}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
    @Override
    public void call(Object o) {
        Log.d("MainActivity", o.toString());
    }
});

来看看结果

06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: first
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: 1
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: first
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: 2
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: first
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: 3
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: first
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: 4
06-21 20:44:32.084 2068-2068/com.example.clevo.rxjavademo D/MainActivity: first1
06-21 20:44:32.084 2068-2068/com.example.clevo.rxjavademo D/MainActivity: first2
06-21 20:44:32.084 2068-2068/com.example.clevo.rxjavademo D/MainActivity: first3
06-21 20:44:32.084 2068-2068/com.example.clevo.rxjavademo D/MainActivity: first4

虽然o1只发射了一条数据,但是另外一个Observal有4条数据出来,最终得到4条结果

Join

Join

恕我个人愚笨,这张图我看了好久才知道是什么意思。这个跟之前所说的结合是一个很大的区别,它带上了有效期。你可以简单的理解是,我输入了手机号,然后等待验证码,验证码有效期是1分钟,如果在1分钟内你发射数据,咱俩就可以结合了,1分钟后,你照死发,我都不会理你。
看图说话,源Observal就是第一行数据,目标Observal是第二行数据。源Observal的有效期是蓝色箭头跟黑色箭头之间部分,目标Observal有效期是黑色部分跟粉红色部分之间。所以我们看,第一个菱形过来之后,粉红色球还处于有效期内,他们结合了;土黄色球来了,菱形还在他有效期内,他们也结合了;但是青色球来了之后,菱形的有效期过了,就不能结合了。

o1.join(o3, new Func1() {
    @Override
    public Object call(Object o) {
        return Observable.timer(2, TimeUnit.SECONDS);
    }}, new Func1() {
    @Override
    public Object call(Object o) {
        return Observable.timer(2, TimeUnit.SECONDS);
    }}, new Func2() {
    @Override
    public Object call(Object o, Object o2) {
        return o.toString()+o2.toString();
    }}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }
    @Override
    public void onNext(Object o) {
        Log.d("MainActivity", o.toString());
    }
});

四个参数是什么意思,o3是目标Observal,第二个参数是源Observal的有效时间,第三个参数是目标Observal的有效时间,第四个参数是如果有合并,自行处理合并后的逻辑。这里,第二个参数的有效期只有2秒,那么算上之前的延时时间,源Observal过期2s之后目标Observal才发射数据,那么这里就不会有结合操作了

跟他类似的还有GroupJoin

GroupJoin

大体上是一致的,区别在与他是等到有效期到了,才响应,而且第四个参数入参有区别,join是你对象Observal发射出来的那个值,而GroupJoin是对象Observal发射出的Observal

o1.groupJoin(o3, new Func1() {
    @Override
    public Object call(Object o) {
        return Observable.timer(7, TimeUnit.SECONDS);
    }}, new Func1() {
    @Override
    public Object call(Object o) {
        return Observable.timer(2, TimeUnit.SECONDS);
    }}, new Func2() {
    @Override
    public Object call(final Object o, Object o2) {
        return ((Observable<String>) o2).flatMap(new Func1<String, Observable<?>>() {
            @Override
            public Observable<?> call(String s) {
                return Observable.just(s+o.toString());
            }
        });
    }}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }
    @Override
    public void onNext(Object o) {
        ((Observable) o).subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
            }
            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }
            @Override
            public void onNext(Object o) {
                Log.d("MainActivity", o.toString());
            }
        });
    }
});

算上之前的启动信息

06-22 02:25:39.927 730-20553/system_process I/ActivityManager: START u0 {act=android.intent.action.MAIN cat=[android.intent.category.LAUNCHER] flg=0x10200000 cmp=com.example.clevo.rxjavademo/.MainActivity (has extras)} from uid 10039 on display 0
06-22 02:25:39.927 730-20553/system_process V/WindowManager: addAppToken: AppWindowToken{1b4d4776 token=Token{ad42a11 ActivityRecord{3912da38 u0 com.example.clevo.rxjavademo/.MainActivity t236}}} to stack=1 task=236 at 0
06-22 02:25:39.954 730-1035/system_process V/WindowManager: Adding window Window{cb3386f u0 com.example.clevo.rxjavademo/com.example.clevo.rxjavademo.MainActivity} at 3 of 9 (before Window{12c90a49 u0 Starting com.example.clevo.rxjavademo})
06-22 02:25:40.117 730-756/system_process I/ActivityManager: Displayed com.example.clevo.rxjavademo/.MainActivity: +186ms
06-22 02:25:47.938 29412-30378/com.example.clevo.rxjavademo D/MainActivity: thirdfirst

可以看到到了有效期结束的时候,顺利打印

Merge

我们在请求列表数据的时候,一般有这个需求,将本地保存的数据与网络请求到的数据合并成一条数据显示出来,这个时候,我们就需要用到Merge

Merge

从图中可以看出来,Merge只是单纯的把多条Observal合并成1条

Observable.merge(o4, o3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
    @Override
    public void call(Object o) {
        Log.d("MainActivity", o.toString());
    }
});

看看结果

06-22 02:42:18.161 13441-13441/com.example.clevo.rxjavademo D/MainActivity: four
06-22 02:42:21.161 13441-13441/com.example.clevo.rxjavademo D/MainActivity: third
06-22 02:42:24.162 13441-13441/com.example.clevo.rxjavademo D/MainActivity: four
06-22 02:42:33.162 13441-13441/com.example.clevo.rxjavademo D/MainActivity: four

还有一个操作符与他类似,MergeDelayError,他与Merge的区别在于,Merge在数据发射过程中如果遇到错误,会立即终止,MergeDelayError则会继续发射,直到数据发射完,才将Error传递给观察者

Observable o5=Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {
        for (int i=0;i<3;i++) {
            try {
                Thread.sleep((i+1)*3000);
                if (i==1) {
                    subscriber.onError(new Exception("ERROR"));
                }
                subscriber.onNext("five");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }}).subscribeOn(Schedulers.io());

Observable.mergeDelayError(o5, o3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }
    @Override
    public void onNext(Object o) {
        Log.d("MainActivity", o.toString());
    }
});

对比下结果

06-22 02:52:15.416 22830-22830/com.example.clevo.rxjavademo D/MainActivity: five
06-22 02:52:18.416 22830-22830/com.example.clevo.rxjavademo D/MainActivity: third
06-22 02:52:21.418 22830-22830/com.example.clevo.rxjavademo D/MainActivity: five
06-22 02:52:30.418 22830-22830/com.example.clevo.rxjavademo D/MainActivity: five
06-22 02:57:21.805 27254-27254/com.example.clevo.rxjavademo D/MainActivity: five
06-22 02:57:22.295 27254-27254/com.example.clevo.rxjavademo D/MainActivity: third

StartWith

这个其实没啥好说的,就是往既定的Observal里面直接加入一个Observal的内容

startWith.c.png
Observable.just("1", "2", "3").startWith("5", "6", "7").subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.d("MainActivity", s);
    }
});

看看结果

06-22 03:36:55.080 31825-31825/? D/MainActivity: 5
06-22 03:36:55.080 31825-31825/? D/MainActivity: 6
06-22 03:36:55.080 31825-31825/? D/MainActivity: 7
06-22 03:36:55.080 31825-31825/? D/MainActivity: 1
06-22 03:36:55.080 31825-31825/? D/MainActivity: 2
06-22 03:36:55.080 31825-31825/? D/MainActivity: 3

SwitchOnNext

SwitchOnNext

如图,有一个Observable,他可以自己狂发射不同的Observal,这个时候,如果在同一个时间内存在两个或多个Observable提交结果,那么只取最后一个Observable提交的结果给观察者,这里面第二个Observal发射数据了,那么第一个Observal从黄球开始就被关闭了

private Observable<String> createObserver(final int index) {
    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            for (int i = 1; i < index; i++) {
                subscriber.onNext(index + "-" + i);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }).subscribeOn(Schedulers.newThread());
}

每隔1s发一条数据

Observable.switchOnNext(Observable.create(
        new Observable.OnSubscribe<Observable<String>>() {
            @Override
            public void call(Subscriber<? super Observable<String>> subscriber) {
                for (int i = 1; i < 3; i++) {
                    if (i==1) {
                        subscriber.onNext(createObserver(10));
                    }
                    else if (i==2) {
                        subscriber.onNext(createObserver(5));
                    }
                    try {
                        Thread.sleep(2100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        })).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
        Log.d("MainActivity", s);
    }
});

这样,分析中我们可知,在过了4200s之后,第一条Observal就会被关闭

06-22 04:25:01.155 11678-11709/com.example.clevo.rxjavademo D/MainActivity: 10-1
06-22 04:25:02.156 11678-11709/com.example.clevo.rxjavademo D/MainActivity: 10-2
06-22 04:25:03.157 11678-11709/com.example.clevo.rxjavademo D/MainActivity: 10-3
06-22 04:25:03.255 11678-11744/com.example.clevo.rxjavademo D/MainActivity: 5-1
06-22 04:25:04.259 11678-11744/com.example.clevo.rxjavademo D/MainActivity: 5-2
06-22 04:25:05.263 11678-11744/com.example.clevo.rxjavademo D/MainActivity: 5-3
06-22 04:25:06.263 11678-11744/com.example.clevo.rxjavademo D/MainActivity: 5-4

组合操作符我们就学习到这里了,下一篇我们学一下简单的错误处理

主要参考文章

上一篇 下一篇

猜你喜欢

热点阅读