RxJava

RxJava<第十一篇>:变换操作符

2019-03-15  本文已影响3人  NoBugException

(1)map

从发射数据到接收数据之间的数据变换。

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
        }
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "我是变换过后的" + integer;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });

以上代码的意思是,发射的数据是Integer类型的, 将Integer类型的数据经过一些处理,最后返回值是String类型。

如果以上的说法不能理解,那么就举个例子:

    new Thread(new Runnable() {
        @Override
        public void run() {
            toUp("live");
        }
    }).start();
private String toUp(String str){
    String s = str.toUpperCase();
    return s;
}

以上的列子很简单, 那么怎么写才能让代码更加优雅呢?

    Observable.just("live").map(new Function<String, String>() {
        @Override
        public String apply(String s) throws Exception {
            return s.toUpperCase();
        }
    }).subscribeOn(Schedulers.newThread()).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });

(2)flatMap

FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

    List<Integer> list = Arrays.asList(1, 2, 3);

    Observable.fromIterable(list).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(final Integer integer) throws Exception {
            Observable observable = Observable.create(new ObservableOnSubscribe<String>() {

                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                    emitter.onNext("第" + integer + "个主任务的第1个分支任务");
                    emitter.onNext("第" + integer + "个主任务的第2个分支任务");
                    emitter.onComplete();
                }
            }).subscribeOn(Schedulers.newThread());
            return observable;
        }
    }).subscribe(new Consumer<String>() {

        @Override
        public void accept(String s) throws Exception {
            System.out.println(s+"---thread:"+ Thread.currentThread().getName());
        }
    });

以上代码将一个任务分成两个任务分别发射。

执行结果如下:

图片.png

如果删除subscribeOn(Schedulers.newThread())代码,那么执行结果是:

图片.png

显然, 在多线程的情况下,接收数据时,是线程不安全的,如果需要线程安全,那么需要使用ConcatMap。

如果以上不怎么理解,那么就举一个例子吧,请问大家有没有遇到过嵌套网络请求,当post请求成功返回数据时,这时我们需要这些数据发起新的(一个或多个)post请求。

需要注意的是,如果第一个post请求成功返回时,第二个和第三个post请求都需要第一个post请求返回的参数时,那么是否考虑线程安全来决定到底使用flatMap还是ConcatMap。

我们先用一般的代码模拟一下网络请求, 如下:

public interface INR{
    void success(int result);
    void failed();
}
//模拟网络请求1
private void networkRequest1(final String s1, final INR inr){
    new Thread(new Runnable() {
        @Override
        public void run() {
            if("1".equals(s1)){
                int result = Integer.parseInt(s1);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    inr.failed();
                }
                inr.success(result);
            } else{
                inr.failed();
            }
        }
    }).start();
}
//模拟网络请求2
private void networkRequest2(final String s2, final INR inr){
    new Thread(new Runnable() {
        @Override
        public void run() {
            if(s2.equals("2")){
                int result = Integer.parseInt(s2);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    inr.failed();
                }
                inr.success(result);
            } else{
                inr.failed();
            }
        }
    }).start();
}
//模拟网络请求3
private void networkRequest3(final String s3, final INR inr){
    new Thread(new Runnable() {
        @Override
        public void run() {
            if("3".equals(s3)){
                int result = Integer.parseInt(s3);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    inr.failed();
                }
                inr.success(result);
            } else{
                inr.failed();
            }
        }
    }).start();
}
    networkRequest1("1", new INR() {
        @Override
        public void success(int result) {
            String newresult = String.valueOf(result + 1);
            networkRequest2(newresult, new INR() {
                @Override
                public void success(int result) {
                    String newresult = String.valueOf(result + 1);
                    networkRequest3(newresult, new INR() {
                        @Override
                        public void success(int result) {
                            System.out.println(String.valueOf(result));
                        }

                        @Override
                        public void failed() {

                        }
                    });
                }

                @Override
                public void failed() {

                }
            });
        }

        @Override
        public void failed() {

        }
    });

一个完整的网络嵌套请求需要写那么多代码,如果可读性很差,代码不优雅,那么Rxjava到底怎么实现呢?

    Observable.just("1").flatMap(new Function<String, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(String s) throws Exception {
            int result = Integer.parseInt(s);
            Thread.sleep(3000);
            return Observable.just(result).map(new Function<Integer, String>() {

                @Override
                public String apply(Integer result) throws Exception {
                    return String.valueOf(result + 1);
                }
            });
        }
    }).flatMap(new Function<String, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(String s) throws Exception {
            int result = Integer.parseInt(s);
            Thread.sleep(3000);
            return Observable.just(result).map(new Function<Integer, String>() {

                @Override
                public String apply(Integer result) throws Exception {
                    return String.valueOf(result + 1);
                }
            });
        }
    }).subscribeOn(Schedulers.newThread()).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });

以上代码就是Rxjava的写法了,是不是很简洁呢?

接下来介绍的ConcatMap就不举例了。

(3)ConcatMap

    List<Integer> list = Arrays.asList(1, 2, 3);

    Observable.fromIterable(list).concatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(final Integer integer) throws Exception {
            Observable observable = Observable.create(new ObservableOnSubscribe<String>() {

                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                    emitter.onNext("第" + integer + "个主任务的第1个分支任务");
                    emitter.onNext("第" + integer + "个主任务的第2个分支任务");

                    emitter.onComplete();
                }
            }).subscribeOn(Schedulers.newThread());
            return observable;
        }
    }).subscribe(new Consumer<String>() {

        @Override
        public void accept(String s) throws Exception {
            System.out.println(s+"---thread:"+ Thread.currentThread().getName());
        }
    });

执行结果是:

图片.png

(4)flatMapIterable

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("A");
            e.onNext("B");
        }
    }).flatMapIterable(new Function<String, Iterable<String>>() {
        @Override
        public Iterable<String> apply(String s) throws Exception {
            List<String> list = Arrays.asList("A", "B", "C");
            return list;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });

(5)switchMap

只发射最近发射的数据,也就是说,如果前一个任务还没完成时就开始了第二个任务,那么前一个任务将被终止。

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("A");
            e.onNext("B");
        }
    }).switchMap(new Function<String, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(String s) throws Exception {

            return Observable.just(s, "---:"+s).subscribeOn(Schedulers.newThread());
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });

日志效果如下

图片.png

(6)scan

sacn操作符是遍历源Observable产生的结果,再按照自定义规则进行运算,依次输出每次计算后的结果给订阅者

    Observable.range(2, 10).scan(new BiFunction<Integer, Integer, Integer>() {
        @Override
        public Integer apply(Integer integer, Integer integer2) throws Exception {
            return integer + integer2;
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println(String.valueOf(integer));
        }
    });

apply回掉第一个参数是上次的结算结果,第二个参数是当此的源observable的输入值

日志如下:

图片.png

(7)groupBy

将1,2,3,4分组

    Observable.just(1,2,3,4).groupBy(new Function<Integer, Integer>() {

        @Override
        public Integer apply(Integer integer) throws Exception {
            return integer % 2;
        }
    }).subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
        @Override
        public void accept(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
            integerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println(integerIntegerGroupedObservable.getKey() + "    "+integer);
                }
            });
        }
    });

日志如下:

图片.png

(8)buffer

字面上是缓存的意思

假如,现在有"one", "two", "three", "four", "five"这5条数据,将将要发射的数据存放到缓存区,每个缓存区是3条数据:
缓存区1:"one", "two", "three"
缓存区2:"four", "five"

    Observable.just("one", "two", "three", "four", "five")//创建了一个有5个数字的被观察者
            .buffer(3)
            .subscribe(new Consumer<List<String>>() {
                @Override
                public void accept(List<String> strings) throws Exception {
                    for (String s : strings){
                        System.out.println(s);
                    }
                    System.out.println("---------------------");
                }
            });

有5个字符串,设置缓存区为3(默认跳3个字符串)

日志效果如下:

图片.png

假如,现在有"one", "two", "three", "four", "five"这5条数据,将将要发射的数据存放到缓存区,每个缓存区是3条数据,设置偏移量为1

缓存区1:"one", "two", "three"
缓存区2: "two", "three", "four"
缓存区3: "three", "four", "five"
缓存区4:"four", "five"
缓存区5: "five"

    Observable.just("one", "two", "three", "four", "five")//创建了一个有5个数字的被观察者
            .buffer(3,1)
            .subscribe(new Consumer<List<String>>() {
                @Override
                public void accept(List<String> strings) throws Exception {
                    for (String s : strings){
                        System.out.println(s);
                    }
                    System.out.println("---------------------");
                }
            });

打印日志如下:

图片.png

(9)window

设置一个window最多3条数据,将这个window封装成Observable,并将Observable发射出去。

    Observable.just("one", "two", "three", "four", "five")//创建了一个有5个数字的被观察者
            .window(3)
            .subscribe(new Consumer<Observable<String>>() {
                @Override
                public void accept(Observable<String> stringObservable) throws Exception {
                    System.out.println("-----------------------");
                    stringObservable.subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println(s);
                        }
                    });
                }
            });

执行效果如下

图片.png
    Observable.just("one", "two", "three", "four", "five")//创建了一个有5个数字的被观察者
            .window(3,1)
            .subscribeOn(Schedulers.io())
            .unsubscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Observable<String>>() {
                @Override
                public void accept(Observable<String> stringObservable) throws Exception {
                    System.out.println("-----------------------");
                    stringObservable.subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println(s);
                        }
                    });
                }
            });

效果如下:

图片.png

需要说明的是:

(10)cast

cast是转换操作符, 从字面上的意思是说可以实现类型的转换

    Observable.range(1, 5).cast(Integer.class).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer ss) throws Exception {
            System.out.println(ss);
        }
    });
    Observable.range(1, 5).cast(Number.class).subscribe(new Consumer<Number>() {
        @Override
        public void accept(Number ss) throws Exception {
            System.out.println(ss);
        }
    });

打印效果如下:

图片.png

如果改成String类型

    Observable.range(1, 5).cast(String.class).subscribe(new Consumer<String>() {
        @Override
        public void accept(String ss) throws Exception {
            System.out.println(ss);
        }
    });

打印效果如下:

图片.png

这是一个比较有疑问的地方。

上一篇下一篇

猜你喜欢

热点阅读