RxJava第二篇,操作符Opreators

2018-08-10  本文已影响0人  Man不经心

操作符(Operators):

其实质是函数式编程中的高阶函数,是对响应式编程的各个过程拆分封装后的产物。以便于我们操作数据流。
按照其作用具体可分为以下几类:

创建

Observable.just("hello world");//发送一个字符串"hello world"
        Observable.just(1,2,3,4);//逐一发送1,2,3,4这四个整数

        Observable.range(0,5)
                .subscribe(new Consumer<Integer>() {
                    @Override public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

过滤(filter与distinct)

    private static void filterAndDistinct() {
        //filter 过滤
        Observable.range(0,10)
                .filter(new Predicate<Integer>() {
                    @Override public boolean test(Integer integer) throws Exception {
                        return integer%3==0;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
        //distinct 去重
        Observable.just(1,1,2,3,4,5,6,5,2)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
        //filter && distinct 去重取偶数
        Observable.just(1,1,2,3,4,5,6,5,2)
                .distinct()
                .filter(new Predicate<Integer>() {
                    @Override public boolean test(Integer integer) throws Exception {
                        return integer%2==0;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

    }

变换(map与flatMap)

    private static void mapAndFlatmap() {
        //map
        Observable.range(0,5)
                .map(new Function<Integer, String>() {
                    @Override public String apply(Integer integer) throws Exception {
                        return integer+"^2 = " + integer*integer ;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });
        //flapmap
        Integer nums1[] = new Integer[]{1,2,3,4};
        Integer nums2[] = new Integer[]{5,6,7};
        Integer nums3[] = new Integer[]{8,9,0};
        Observable.just(nums1,nums2,nums3)
                .flatMap(new Function<Integer[], ObservableSource<Integer>>() {
                    @Override public ObservableSource<Integer> apply(Integer[] integers)
                            throws Exception {
                        return Observable.fromArray(integers);
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
    }

组合(mergeWith与concatWith)

 private static void mergeWithAndConcatWith() {
        Integer nums2[] = new Integer[]{4,5,6,7,8};
        //mergeWith
        Observable.just(1,2,3,4,5)
                .mergeWith(Observable.fromArray(nums2))
                .subscribe(new Consumer<Integer>() {
                    @Override public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
        //concatWith
        Observable.just(1,2,3,4,5)
                .concatWith(Observable.fromArray(nums2))
                .subscribe(new Consumer<Integer>() {
                    @Override public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

    }

聚合(zipWith)

    private static void zipWith() {
        String names[] = new String[]{"红","橙","黄","绿","蓝","靛","紫"};
        Observable.just(1,2,3,4,5,6,7,8)
                .zipWith(Observable.fromArray(names),new BiFunction<Integer,String,String>(){
                    @Override public String apply(Integer integer, String s) throws Exception {
                        return integer+s;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });
    }

zipWith需要接收两个参数,
一个是可观察对象,
另一个是聚合函数接口BiFunction,这个接口有三个泛型,分别为第一个可观察对象发射的数据类型,第二个可观察对象发射的数据类型,经过聚合函数apply处理后返回的数据类型

链接使用

    private static void allOperators() {
        Integer nums1[] = new Integer[]{1,3,7,8,9};
        Integer nums2[] = new Integer[]{3,4,5,6};
        String  names[] = new String[]{"红","橙","黄","绿","蓝","靛","紫"};
        Observable.just(nums1)
                .flatMap(new Function<Integer[], Observable<Integer>>() {
                    @Override public Observable<Integer> apply(Integer[] integers)
                            throws Exception {
                        return Observable.fromArray(integers);
                    }
                })
                .mergeWith(Observable.fromArray(nums2))
                .concatWith(Observable.just(1,2))
                .distinct()
                .filter(new Predicate<Integer>() {
                    @Override public boolean test(Integer integer) throws Exception {
                        return integer<5;
                    }
                })
                .map(new Function<Integer, String>() {
                    @Override public String apply(Integer integer) throws Exception {
                        return integer+":";
                    }
                })
                .zipWith(Observable.fromArray(names), new BiFunction<String, String, String>() {
                    @Override public String apply(String s, String s2) throws Exception {
                        return s+s2;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });
    }

git目录
Rxjava第一篇RxJava第一篇,RxJava入门
Rxjava第三篇RxJava第三篇,调度器Scheduler

上一篇下一篇

猜你喜欢

热点阅读