Rxjava(二)之创建操作符与转换操作符

2021-01-25  本文已影响0人  梦星夜雨

前言

Rxjava之所以如此受欢迎,与其强大的操作符是息息相关的。它几乎能完成所有的功能需求。下面我们开始介绍常见的操作符。

创建型操作符

常见的创建型操作符有,create、just、fromArray、empty、range。
对于create我们这里就不多做介绍了。

just操作符

Observable.just(1,2,3).subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG,"accept: "+integer);
                    }
                });

just操作符可以在内部发射任意数量相同类型的元素给观察者,通过分析源码我们可以知道其实内部调用的是fromArray操作符。

fromArray操作符

String[] items = {"张三","李四","王五"};
Observable.fromArray(items).subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {
                        Log.d(TAG,"accept: "+str);
                    }
                });

fromArray操作符内部通过发射一个数集对象给观察者。

empty操作符

Observable.empty().subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG,"onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                Log.d(TAG,"onNext: "+o);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG,"onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG,"onComplete");
            }
        });
onSubscribe
onComplete

empty操作符内部自己发射,下游默认是Object,无法发送有值事件,只会发送onComplete。

range操作符

Observable.range(30,5).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG,"accpt: "+integer);
            }
        });
accpt: 30
accpt: 31
accpt: 32
accpt: 33
accpt: 34

range操作符内部自己发射,从start(参数一)开始,发射count(参数二)个,每次加一。

变换操作符

变换操作符是对被观察者发送的事件进行一定的加工处理(转换)操作,然后再发送给被观察者。常见的变换操作符有map、flatMap、concatMap、groupBy、buffer。
map操作符

Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(2);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                Log.d(TAG,"apply: "+integer);
                return "["+integer+"}";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String str) throws Exception {
                Log.d(TAG,"accpt: "+str);
            }
        });
apply: 2
accpt: {2}

可以看到,被观察者发送的是一个int型的数据,但我们通过map操作符,增加了自己的逻辑,然后返回了一个String类型的数据给观察者。

flatMap操作符

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("张三");
                e.onNext("李四");
                e.onNext("王五");
            }
        }).flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String str) throws Exception {
                List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add(str + " 下标:" + (1 + i));
                }
                return Observable.fromIterable(list).delay(5, TimeUnit.SECONDS); // 创建型操作符,创建被观察者

            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String str) throws Exception {
                Log.d(TAG,"accpt: "+str);
            }
        });
accpt: 张三 下标:1
accpt: 张三 下标:2
accpt: 张三 下标:3
accpt: 李四 下标:1
accpt: 王五 下标:1
accpt: 王五 下标:2
accpt: 王五 下标:3
accpt: 李四 下标:2
accpt: 李四 下标:3

依旧是在被观察者和订阅观察者之间添加flatMap操作符,我们可以看到flatMap操作符需要返回一个ObservableSource对象,而ObservableSource实际上是一个接口,并且Observable实现了这个接口,那么我们可以返回一个Observable对象,我们可以在apply方法中添加自己的操作。
这里我们为什么要发送这么对消息呢,因为我们要证明flatMap操作符是不排序的,由打印日志可以得到证明。

concatMap操作符
用法上和flatMap一样,它们的唯一区别就是concatMap操作符是排序的,至于代码和应用我这里就不赘述了。

groupBy操作符

Observable.just(1, 2, 100, 200, 3000, 4000).groupBy(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                if (integer < 10) {
                    return "small";
                } else if (integer >= 10 && integer < 1000) {
                    return "medium";
                } else {
                    return "big";
                }
            }
        }).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
            @Override
            public void accept(final GroupedObservable<String, Integer> groupedObservable) throws Exception {
                groupedObservable.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + groupedObservable.getKey()+"---"+integer);
                    }
                });
            }
        });
 accept: small---1
 accept: small---2
 accept: medium---100
 accept: medium---200
 accept: big---3000
 accept: big---4000

groupBy操作符是对被观察者发送的数据进行分组,然后将分组后的数据传递给观察者。这里需要注意的是,在观察者中得到的是GroupedObservable对象,若想得到原本被观察者的值,则需要再一次进行封装。

buffer操作符

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                for(int i = 0;i<100;i++){
                    e.onNext(i);
                }
            }
        }).buffer(20).subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                Log.d(TAG,"accept: "+ integers);
            }
        });
accept: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
accept: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
accept: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
accept: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
accept: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

buffer操作符是对被观察者发送的数据进行分组,然后将分组完成后的数据封装成一个List发送给观察者。

上一篇下一篇

猜你喜欢

热点阅读