RxJava操作符

2020-12-08  本文已影响0人  慕尼黑凌晨四点

RxJava操作符

官网地址:http://reactivex.io/documentation/operators.html
一个很详细的博客:https://juejin.cn/post/6844903885245513741

按照操作符的类型可分为:创建操作符,变换操作符,过滤操作符,组合操作符,错误操作符,辅助操作符,条件和布尔操作符,算术和聚合操作符 及 连接操作符 等。

RxJava3 中 Action<T>被弃用了,改为Consumer<T>

创建操作符

Observable.create(new Observable.OnSubscribe<T>(){
    @Overrode
    public void call(Subscriber<? super T>)  sub){
        sub.onNext("0");
        sub.onNext("1");
        sub.onCompleted();
    }
})
String list = {"0","1","2"}
Observable.from(list);
Observable.just("0","1","2")
Observable<Long> observable = Observable.interval(3, TimeUnit.SECONDS, Schedulers.trampoline());
//, Schedulers.trampoline() 指定一个线程,默认新线程(NEW_THREAD),这种情况下主线程结束,程序就结束了。
observable.subscribe(new Consumer<Long>() {  //泛型只能是Long类型,下同
    public void accept(Long aLong) throws Throwable {
         //TODO 
    }
});

<img src="C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20201207122725363.png" alt="image-20201207122725363" style="zoom:50%;" />

Observable<Integer> observable = Observable.range(0,5);//左闭右开
Observable<Integer> observable = Observable.range(0,5)
        .repeat(2)//重复两次

变换操作符

Observable<Integer> observable = Observable.range(0,5);
Observable<String> map = observable.map(new Function<Integer, String>() {
    public String apply(Integer integer) throws Throwable {
        return integer.toString();
    }
});
final Observable<Integer> observable = Observable.range(0,5);
observable.flatMap(new Function<Integer, ObservableSource<?>>() {
    public ObservableSource<?> apply(Integer integer) throws Throwable {
        return Observable.just("0","1","haha");
    }
}).cast(String.class);

FlatMap类似。区别在于:flatMap不保证顺序,当上一个操作未执行时,下一个操作可能已经发射。

concatMap保证了顺序,必须等到上一个操作发射完,才会发射下一个。

concatMapflatMap的功能是一样的, 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据放进一个单独的Observable。只不过最后合并ObservablesflatMap采用的merge,而concatMap采用的是连接(concat)。总之一句一话,他们的区别在于:concatMap是有序的,flatMap是无序的,concatMap最终输出的顺序与原序列保持一致,而flatMap则不一定,有可能出现交错。 ---出自这里

Observable
        .interval(1, TimeUnit.SECONDS, Schedulers.trampoline())
        .flatMapIterable( aLong -> Arrays.asList("a", "integer") )
        .subscribe(s -> System.out.println(s));
Observable
        .interval(1, TimeUnit.SECONDS, Schedulers.trampoline())
        .buffer(3)//集满3个再一并发射
Person p1 = new Person("hh",13);
Person p2 = new Person("ff",13);
Person p3 = new Person("gg",23);
Person p4 = new Person("aa",13);
Person p5 = new Person("bb",33);
@NonNull Observable<GroupedObservable<Integer, Person>> group = Observable.just(p1, p2, p3, p4, p5)
    .groupBy(p -> p.age);//按年龄进行排序
Observable.concat(group).subscribe(new Consumer<Person>() {
    @Override
    public void accept(Person p) throws Throwable {
        System.out.println(">>>>>>"+p.name+","+p.age);
    }
});
//Result ----------------------按年龄进行排序结果----
>>>>>>hh,13
>>>>>>ff,13
>>>>>>aa,13
>>>>>>gg,23
>>>>>>bb,33

过滤操作符

Observable.just(1, 2, 3, 4,7, 5)
        .filter(i -> i>2)//只保留大于2的元素
Observable.interval(1000,TimeUnit.MILLISECONDS, Schedulers.trampoline())
        .throttleFirst(2000,TimeUnit.MILLISECONDS)
        .subscribe(l -> System.out.println(l));
//-------------------结果-----------
//  0,3,6,9, .........
//2000改为1900则是 2,4,6,8 ...自己体会
image-20201208100601354.png

通过时间来限流。源Observable每次发射出来一个数据后都会进行计时,如果再设定好的时间结束前Observable有新的数据发射出来,这个数据就会被丢弃,同时开始重新计时。

组合操作

Observable.just(3,4,5).startWith(1,2)
Observable.merge(obs1,obs2)
Observable.concat(obs1,obs2)
@NonNull Observable<Long> obs1 = Observable.just(1L,2L,3L);
@NonNull Observable<String> obs2 = Observable.just("1L","2L","3L");
Observable.zip(obs1,obs2,(l1,l2) -> l1+","+l2).subscribe(l -> System.out.println(l));
----------------------
1,1L
2,2L
3,3L
@NonNull Observable<Long> obs1 = Observable.just(1L,2L,3L);
@NonNull Observable<String> obs2 = Observable.just("1L","2L","3L");
Observable.combineLatest(obs1,obs2,(l1,l2) -> l1+","+l2).subscribe(l -> System.out.println(l));
------------------------
3,1L
3,2L
3,3L

辅助操作符

错误处理操作符

RxJavaPlugins.setErrorHandler(e -> { });
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    for (int i = 0; i < 20; i++) {
        if (i>2{
            emitter.onError(new Throwable("ERROR"));
        }
        emitter.onNext(i);
    }
    emitter.onComplete();
})onErrorReturn(t -> 100)//错误的时候直接返回数据100
0
1
2
100
complete

布尔操作符

Observable.just(1,2,3,4,5)
        .all(integer -> {
            System.out.println(integer);
            return integer<3;
        })
        .subscribe(aBoolean -> System.out.println(aBoolean));
//-----------------结果------
1
2
3
false

条件操作符

转换操作符

将Observable转换成另一个对象或数据结构。比map更简介。

Observable.just(1,2,3,4,5)
        .toList()
        .subscribe(list -> {System.out.println(list.size())});//{1,2,3.4,5}
Observable.just(1,2,3,4,5)
        .toMap(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Throwable {
                return integer.toString();
            }
        })
        .subscribe(map -> System.out.println(map.toString()));
//-------------------------result---------------
//.subscribe(map -> System.out.println(map.toString()));
//返回的是 {("1",1),("2",2) ...("5",5)}这个map

未完待续。。。

ps: 其实还有很多没写,太多了,学不动了😵

上一篇下一篇

猜你喜欢

热点阅读