Android Developer关于Android

Android Develop——RxJava2(二) RxJa

2017-09-11  本文已影响1917人  So_ProbuING

在RxJava2(一)教程中,已经跟着大神们学习了RxJava2的基本使用,现在我们来学习一下RxJava2很强大的操作符

Android RxJava2操作符

Map

map操作符示意
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "subscribe: ");
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                String mapStr = String.valueOf(integer + 1);
                return mapStr;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });

FlatMap

flatMap是一个非常强大的操作符,flatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里。

flatMap

上游发送三个事件,分别是1,2,3注意它们的颜色,中间flatMap的作用是将圆形的事件转换为一个发送矩形事件和三角形事件的新的上游Observable

flatmap分解动作.jpg

上游每发送一个事件,flatMap都将创建一个新的水管,然后发送转换之后的新的事件,下游接收到的就是这些新的水管发送的数据。flatMap不能保证事件的顺序

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).flatMap(new Function<Integer, Observable<String>>() {
            @Override
            public Observable<String> apply(Integer integer) throws Exception {
                ArrayList<String> arrayList = new ArrayList<>();
                for (int i = 0; i < 5; i++) {
                    String iStr = "flatMap value" + integer;
                    arrayList.add(iStr);
                }
                return Observable.fromIterable(arrayList).delay(10, TimeUnit.MICROSECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });

concatMap

concatMap和flatMap的作用是一样的,它的结果是严格按照上游发送的顺序来发送的。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(11);
                e.onNext(111);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                ArrayList<String> arrayList = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    arrayList.add("concatMap value" + i);
                }
                return Observable.fromIterable(arrayList).delay(5, TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });
09-11 03:21:32.970 25661-25678/? D/Rxjava2Lesson: accept: concatMap value0
09-11 03:21:32.970 25661-25678/? D/Rxjava2Lesson: accept: concatMap value1
09-11 03:21:32.970 25661-25678/? D/Rxjava2Lesson: accept: concatMap value2
09-11 03:21:32.981 25661-25680/? D/Rxjava2Lesson: accept: concatMap value0
09-11 03:21:32.981 25661-25680/? D/Rxjava2Lesson: accept: concatMap value1
09-11 03:21:32.981 25661-25680/? D/Rxjava2Lesson: accept: concatMap value2
09-11 03:21:32.988 25661-25681/? D/Rxjava2Lesson: accept: concatMap value0
09-11 03:21:32.988 25661-25681/? D/Rxjava2Lesson: accept: concatMap value1
09-11 03:21:32.988 25661-25681/? D/Rxjava2Lesson: accept: concatMap value2

Buffer

Buffer操作符会定期收集Observable的数据放进一个数据包裹,然后发射这些包裹,并不是一次发射一个值
Buffer操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据。

Buffer变体

Scan

Scan连续地对数据序列的每一项应用一个函数,然后连续发射结果
Scan操作符对原始Observable发射的第一项数据应用一个函数,然后将这个函数的结果作为自己的第一项数据发射。将函数的结果同第二项数据一起填充给这个函数来产生自己的第二项数据。持续进行这个过程来产生剩余的数据序列。

scan.jpg
 Observable.just(1, 2, 3, 4, 5)
                .scan(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {

                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        });

Window

Window定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口而不是每次发射一项数据

window.jpg

window和Buffer类似,但不是发射来自原始Observable的数据包,发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onComplete通知。

   Observable.range(1, 10).window(new Observable<Integer>() {
            @Override
            protected void subscribeActual(Observer<? super Integer> observer) {
                Log.d(TAG, "subscribeActual: ");
                observer.onNext(1);
                observer.onNext(1);
                observer.onNext(1);
            }
        }).subscribe(new Consumer<Observable<Integer>>() {
            @Override
            public void accept(Observable<Integer> integerObservable) throws Exception {
                integerObservable.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + integer);
                    }
                });
            }
        });

ZIP操作符

ZIP通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件。按照严格的顺序应用这个函数,只发射与发射项最少的那个Observable一样多的数据

zip.jpg

从图中看到,有两个上游的水管,通过ZIP操作符,使得两个事件合并为了一个事件

zip分解动作.jpg
  //上游水管第一个事件
        Observable<Integer> observable1 = Observable.range(1, 5);
        //上游水管第二个事件
        Observable<Integer> observable2 = Observable.range(6, 10);
        //合并事件
        Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
            @Override
            public String apply(Integer integer, Integer integer2) throws Exception {
                return String.valueOf(integer + integer2);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });

实践

public interface Api {
    @GET
    Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);

    @GET
    Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);

}

zip打包

Observable<UserBaseInfoResponse> observable1 =                                            
        api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());      

Observable<UserExtraInfoResponse> observable2 =                                           
        api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());    

Observable.zip(observable1, observable2,                                                  
        new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {         
            @Override                                                                     
            public UserInfo apply(UserBaseInfoResponse baseInfo,                          
                                  UserExtraInfoResponse extraInfo) throws Exception {     
                return new UserInfo(baseInfo, extraInfo);                                 
            }                                                                             
        }).observeOn(AndroidSchedulers.mainThread())                                      
        .subscribe(new Consumer<UserInfo>() {                                             
            @Override                                                                     
            public void accept(UserInfo userInfo) throws Exception {                      
                //do something;                                                           
            }                                                                             
        });

基本的操作符就是这些了,以后再学习到其它的运算符再继续补充

上一篇下一篇

猜你喜欢

热点阅读