RxJava2主题v5

2017-07-17  本文已影响50人  keyboard3

RxJava和java8的Stream类似 都是对数据流进行操作
基于rxJava版本rxjava2-2.0.1

Rxjava常用操作符

创建

create

通过创建回调函数自定义subscribe发送

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.print(integer + ",");
            }
        });
//结果
1,2,3,

just

将单个item作为输入,然后单项发送给subscribe

        Observable.just(1, 2, 3).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.print(integer + ",");
            }
        });
//结果
1,2,3,

from

将数组作为输入,然后依次单个发送给subscribe

        Observable.from(Arrays.asList(1, 2, 3)).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.print(integer + ",");
            }
        });
//结果
1,2,3,

Empty/Never/Throw

Empty:创建一个不发射任何数据但是正常终止的Observable(调用complete)
Never:创建一个不发射数据也不终止的Observable(不调用complete)
Throw:创建一个不发射数据以一个错误终止的Observable(调用error)

        System.out.println("\nEmpty:");
        Observable.empty().subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                System.out.print("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.print("onError");
            }

            @Override
            public void onNext(Object o) {
                System.out.print("onNext");
            }
        });
        System.out.println("\nNever:");
        Observable.never().subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                System.out.print("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.print("onError");
            }

            @Override
            public void onNext(Object o) {
                System.out.print("onNext");
            }
        });
//结果
Empty:
onCompleted
Never:

Interval

创建一个按固定时间间隔发射整数序列的Observable(异步的)
Timer效果与其类似,不建议使用

       Observable.interval(1, TimeUnit.MILLISECONDS).subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                System.out.print("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.print("onError");
            }

            @Override
            public void onNext(Object o) {
                System.out.print("onNext");
            }
        });
//结果 每隔一秒生成一个next

Range

Range操作符发射一个范围内的有序整数序列,你可以指定范围的起始和长度。

        Observable.range(7, 3).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.print(integer + ",");
            }
        });
//结果
7,8,9,

repeat

重复数据发送n遍
repeatWhen指定条件才重复发送

        Observable.just(1, 2)
                .repeat(2)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print(integer + " ");
                    }
                });
//结果
1 2 1 2 

转换

Buffer

定期收集Observable的数据放进一个数据包裹,然后发射这些数据集合。(感觉应用场景:将数组按照拆分成大小相同的多个子数组)

        Observable.range(2, 6).buffer(3)
                .subscribe(new Subscriber<List<Integer>>() {
            @Override
            public void onCompleted() {
                System.out.printf("onCompleted\n");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(List<Integer> integers) {
                System.out.printf(integers.toString());
            }
        });
//结果
[2, 3, 4][5, 6, 7]onCompleted

window

定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据

        Observable.range(2, 6).window(3)
                .subscribe(new Subscriber<Observable<Integer>>() {
            @Override
            public void onCompleted() {
                System.out.printf("w-Completed\n");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Observable<Integer> integerObservable) {
                integerObservable.subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.printf("inner-Completed\n");
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.printf(integer + ",");
                    }
                });
            }
        });
//结果
2,3,4,inner-Completed
5,6,7,inner-Completed
w-Completed

cast

在Observable发射数据前将所有数据类型进行强转为指定类型

        Observable.just(2.5F, 3, 5F, 0.6F).cast(Object.class).subscribe(new Action1<Object>() {
            @Override
            public void call(Object s) {
                System.out.print(s + ",");
            }
        });
//结果
2.5,3,5.0,0.6,

Map

仅仅是将数据进行转换

        Observable.range(2, 5).map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return integer + "call";
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.printf(s + ",");
            }
        });
//结果
2call,3call,4call,5call,6call,

FlatMap

将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并。(就是将数据转化为Observable,实际应用场景:将数组数据项通过from转成Observable,然后通过Merge将Observables中的数据项都平展开到新的Observable中,成为了将多维数组降维的目的)

        Observable.create(new Observable.OnSubscribe<List<Integer>>() {
            @Override
            public void call(Subscriber<? super List<Integer>> subscriber) {
                subscriber.onNext(Arrays.asList(1, 2, 3, 4, 5));
            }
        }).flatMap(new Func1<List<Integer>, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(List<Integer> integers) {
                return Observable.from(integers);
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.printf(integer + ",");
            }
        });
//结果
1,2,3,4,5,

GroupBy

将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据。(其实就是将数据项对应生成Key处理后转为一个Observable)

        Observable.range(0, 10).groupBy(new Func1<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) {
                return integer % 3;
            }
        }).subscribe(new Action1<GroupedObservable<Integer, Integer>>() {
            @Override
            public void call(final GroupedObservable<Integer, Integer> item) {
                item.subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.print(item.getKey() + ":" + "onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.printf(item.getKey() + ":" + integer + ",");
                    }
                });
            }
        });
//结果
0:0,1:1,2:2,0:3,1:4,2:5,0:6,1:7,2:8,0:9,0:onCompleted1:onCompleted2:onCompleted

Scan

对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值。

        Observable.range(0, 5).scan(new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer sum, Integer item) {
                return sum + item;
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.printf(integer + ",");
            }
        });
//结果
0,1,3,6,10,

过滤

Debounce

过滤特定时间内最近的数据发送,其他数据不发送。

        Observable.just(1, 2, 3)
                .debounce(400, TimeUnit.MILLISECONDS)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print("integer=" + integer);
                    }
                });
//结果:过滤400毫秒后最近的一个
integer=3

Sample

周期性发送最近的数据

Observable.interval(300, TimeUnit.MILLISECONDS)
                .sample(500, TimeUnit.MILLISECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        System.out.print("s=" + aLong + ",");
                    }
                });
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
//结果
s=0,s=2,s=4,

Distinct

过滤排除重复的数据

        Observable.just(1, 2, 4, 1, 2, 1)
                .distinct()
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print("integer=" + integer + ",");
                    }
                });
//结果
integer=1,integer=2,integer=4,

ElementAt

只过滤发送Observable中指定位置的数据

        Observable.just(1, 2, 3, 4, 5)
                .elementAt(3)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print("integer=" + integer + ",");
                    }
                });
//结果
ElementAt
integer=4,

Filter

定义过滤函数来过滤发送的数据

        Observable.just(1, 2, 3, 4)
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer == 3||integer==4;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print("integer=" + integer + ",");
                    }
                });
//结果
integer=3,integer=4,

First

过滤第一个数据或者符合过滤条件的第一个数据

        Observable.just(1, 2, 3, 4)
                .first(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer % 2 == 0;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print("integer=" + integer + ",");
                    }
                });
//结果
integer=2,

Last

过滤最后一个数据或者符合过滤条件的最后一个数据

        Observable.just(1, 2, 3, 4)
                .last(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer % 2 == 0;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print("integer=" + integer + ",");
                    }
                });
//结果
integer=4,

IgnoreElements

忽略不让原Observable所有数据发送,但允许发送终止通知。如onError和onComplete

        Observable
                .create(new Observable.OnSubscribe<Integer>() {
                    @Override
                    public void call(Subscriber<? super Integer> subscriber) {
                        subscriber.onNext(1);
                        subscriber.onCompleted();
                        subscriber.onNext(2);
                        subscriber.onNext(3);
                    }
                })
                .ignoreElements()
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.print("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.print("integer=" + integer + ",");
                    }
                });
//结果
onCompleted

Skip

跳过头到第n个数据发送

        Observable.range(0, 10)
                .skip(5)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print("integer=" + integer + ",");
                    }
                });
//结果
integer=5,integer=6,integer=7,integer=8,integer=9,

SkipLast

跳过从尾数到第n个数据发送

        Observable.range(0, 10)
                .skipLast(5)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print("integer=" + integer + ",");
                    }
                });
//结果
integer=0,integer=1,integer=2,integer=3,integer=4,

Take

从头开始过滤出n个数据发送

        Observable.range(0, 10)
                .take(5)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print("integer=" + integer + ",");
                    }
                });
//结果
integer=0,integer=1,integer=2,integer=3,integer=4,

TakeLast

从尾开始过滤出n个数据发送

        Observable.range(0, 10)
                .takeLast(5)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print("integer=" + integer + ",");
                    }
                });
//结果
integer=5,integer=6,integer=7,integer=8,integer=9,

组合

merge

合并两个Observable的数据结果,顺序不定(猜测是异步发送)

        Observable<Long> interval1 = Observable.interval(100, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong - 5L;
                    }
                }).take(5);//抑制定时整数序列数量5个
        Observable<Long> interval2 = Observable.interval(100, TimeUnit.MILLISECONDS).take(5);//抑制定时整数序列数量5个
        Observable.merge(interval1, interval2).subscribe(new Action1<Number>() {
            @Override
            public void call(Number number) {
                System.out.print(number + " ");
            }
        });
//结果
0 -5 -4 1 -3 2 -2 3 -1 4 

Concat

合并两个Observable的数据结果,顺序前后确定(同步发送,第一个发送完毕才会发送第二个)

        Observable<Long> interval1 = Observable.interval(100, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong - 5L;
                    }
                }).take(5);//抑制定时整数序列数量5个
        Observable<Long> interval2 = Observable.interval(100, TimeUnit.MILLISECONDS).take(5);//抑制定时整数序列数量5个
        Observable.concat(interval1, interval2).subscribe(new Action1<Number>() {
            @Override
            public void call(Number number) {
                System.out.print(number + " ");
            }
        });
//结果
-5 -4 -3 -2 -1 0 1 2 3 4 

startWith

在observable的数据源前面插入数据

        Observable<Integer> range = Observable.range(0, 10);
        range.startWith(-2, -1).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.print(integer + " ");
            }
        });
//结果
-2 -1 0 1 2 3 4 5 6 7 8 9 

zip

合并Observables的数据,使用发射的顺序作为合并的标记
Javadoc: zip(Iterable,FuncN)

Javadoc: zip(Observable,FuncN)

Javadoc: zip(Observable,Observable,Func2) (最多可以有九个Observables参数)

        Observable<Long> interval5 = Observable.interval(100, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong - 5L;
                    }
                }).take(6);//抑制定时整数序列数量5个
        Observable<Long> interval6 = Observable.interval(100, TimeUnit.MILLISECONDS).take(5);//抑制定时整数序列数量5个
        Observable.zip(interval5, interval6, new Func2<Long, Long, String>() {
            @Override
            public String call(Long aLong, Long aLong2) {
                return aLong + "~" + aLong2;
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String item) {
                System.out.print(item + " ");
            }
        });
//结果
-5~0 -4~1 -3~2 -2~3 -1~4 

CombineLatest

根据每个Observable的发送时间作为合并标记,任何一个Observable数据发送之后就和其他的Observable最近发送的数据进行合并。

        Observable<Long> interval7 = Observable.interval(100, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong - 5L;
                    }
                }).take(6);//抑制定时整数序列数量5个
        Observable<Long> interval8 = Observable.interval(200, TimeUnit.MILLISECONDS).take(5);//抑制定时整数序列数量5个
        Observable.combineLatest(interval7, interval8, new Func2<Long, Long, String>() {
            @Override
            public String call(Long aLong, Long aLong2) {
                return aLong + "~" + aLong2;
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String item) {
                System.out.print(item + " ");
            }
        });
//结果
-4~0 -3~0 -3~1 -2~1 -1~1 -1~2 0~2 0~3 0~4 

join

合并的两个Observable的数据,合并标记基于对Observable数据定义的生命周期,如果数据过了生命周期则就不会和其他Observable的数据进行合并,否则就排列组合形式合并。(如果不懂建议点击链接直接看图)

        Observable<Long> observable1 = Observable.interval(300, TimeUnit.MILLISECONDS).delay(400, TimeUnit.MILLISECONDS).take(5);//延迟1个多身位
        Observable<Long> observable2 = Observable.interval(300, TimeUnit.MILLISECONDS).take(5);
        observable1.join(observable2, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.just(aLong).delay(100, TimeUnit.MILLISECONDS);//每个数据100毫秒生命
            }
        }, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.just(aLong).delay(150, TimeUnit.MILLISECONDS);//每个数据150毫秒生命
            }
        }, new Func2<Long, Long, String>() {
            @Override
            public String call(Long aLong, Long aLong2) {
                return "A-" + aLong + ":B-" + aLong2;
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.print(s+" ");
            }
        });
//结果
A-0:B-1 A-1:B-2 A-2:B-3 A-3:B-4 

错误处理

catch

拦截原始Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onError(new IllegalAccessException("no access"));
            }
        }).onErrorReturn(new Func1<Throwable, Integer>() {
            @Override
            public Integer call(Throwable throwable) {
                return 10001;
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                System.out.print("onError");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.print("error_code:" + integer);
            }
        });
//结果
error_code:10001
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onError(new IllegalAccessException("no access"));
            }
        }).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
            @Override
            public Observable<? extends Integer> call(Throwable throwable) {
                return Observable.just(1, 2);
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                System.out.print("onError");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.print(",value:" + integer);
            }
        });
//结果
,value:1,value:2
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onError(null);
            }
        }).onExceptionResumeNext(Observable.just(1, 2, 3, 4))
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.print("onError:");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.print(",value:" + integer);
                    }
                });
//结果
onError:

重试

如果原始Observable遇到错误,重新订阅它期望它能正常终止,数据会有部分重复。

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onError(new IllegalAccessException("IllegalAccessException"));
            }
        }).retry(new Func2<Integer, Throwable, Boolean>() {
            @Override
            public Boolean call(Integer integer, Throwable throwable) {
                System.out.println("num:" + integer + " throwable:" + throwable.getMessage());
                return !(throwable instanceof IllegalAccessException);
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError:" + e.getMessage());
            }

            @Override
            public void onNext(Integer integer) {

            }
        });
//结果
num:1 throwable:IllegalAccessException
onError:IllegalAccessException
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println("subscribing");
                subscriber.onError(new RuntimeException("always fails"));
            }
        }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
                    @Override
                    public Integer call(Throwable throwable, Integer integer) {
                        return integer;
                    }
                }).flatMap(new Func1<Integer, Observable<?>>() {
                    @Override
                    public Observable<?> call(Integer integer) {
                        System.out.println("delay retry by " + integer + " second(s)");
                        return Observable.timer(integer, TimeUnit.SECONDS);
                    }
                });
            }
        }).toBlocking().forEach(new Action1<Object>() {
            @Override
            public void call(Object o) {
                System.out.println("foreach:"+o);
            }
        });
//结果
subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing

除了这些还有辅助、条件和布尔、算术和聚合、连接、转换、操作符决策树操作符
其他操作符太零散了,直接参考下面
中文翻译
官网-操作符介绍
操作符doc文档

上一篇下一篇

猜你喜欢

热点阅读