RxJava部分操作符解析
今天我们来看部分RxJava相关的知识,主要是上一篇RxJava内存泄漏的一种解决方案提到的开源框架RxLifecycle
里面会涉及到的知识点,有下面几个:
1 Subject
2 takeUntil
3 filter
4 compose
1.Subject
从代码可以看出来Subject既可以当观察者也可以当被观察者。
public abstract class Subject<T> extends Observable<T> implements Observer<T>
所以可以在生命周期中通过Subject发送事件然后又自己接收,从而根据事件类型做相应的操作。
Subject总共有四种类型
1 AsyncSubject
2 BehaviorSubject
3 PublishSubject
4 ReplaySubject
今天我们就说下第二种类型BehaviorSubject
,它可以给订阅者发送订阅前最近的事件和订阅后发送的事件:
图中橙色的就是订阅前最近发送的事件,在订阅后也可以收到。文字解释始终太苍白,我们来看下代码:
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();
behaviorSubject.onNext(1);
behaviorSubject.onNext(2);
behaviorSubject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Timber.tag(TAG).d("running num : " + integer);
}
});
behaviorSubject.onNext(3);
behaviorSubject.onNext(4);
上面代码运行结果就是收到2, 3,4
behaviorSubject.PNG
2.takeUntil
这是一个操作符,可以这样用
AObservable.takeUntil(BObservable)
可以AObservable监听另外一个BObservable,如果BObservable开始发送数据,AObservable就不再发送数据。
看一下官方的图片解释,B发送0数据后,A就停止发送数据了。
talk is cheap, show me the code
:
Observable.interval(1, TimeUnit.SECONDS).
subscribeOn(Schedulers.io()).
takeUntil(Observable.timer(5, TimeUnit.SECONDS)).
subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Timber.tag(TAG).d("running num : " + num);
}
});
上面代码的意思就是从0开发每隔1秒发送一个数据,5s时停止发送,看下运行结果,和我们的预期完美一致:
takeUntil Result.PNG
3.filter
filter操作符就是过滤的意思,只有事件满足过滤条件时被观察者才会发送给观察者。看下官方的解释图,很清晰明了我就不做解释了哈。
filter.PNG
看一下怎么用,这个代码的意思还是每个1s发送数据,但是会进行过滤只发送偶数,也是5秒后停止发送:
Observable.interval(1, TimeUnit.SECONDS).
subscribeOn(Schedulers.io()).
filter(new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
return aLong % 2 == 0;
}
}).
takeUntil(Observable.timer(5, TimeUnit.SECONDS)).
subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Timber.tag(TAG).e("running num : " + num);
}
});
上面代码的运行效果,确实是只收到了偶数。
filter result.PNG
4.compose
compose操作符是用来对Observable进行转换操作的,并且可以保证调用链不被破坏。
比如我们经常这样用:
Observable.interval(1,TimeUnit.SECONDS)
.subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread());
这部分代码经常写,怎么进行封装呢?可能有的小伙伴立马就想到下面的方法:
private Observable composeObservable(Observable observable){
return observable.subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread());
}
但是上面这样用就破坏了调用链了,因为你肯定得这样调用,这样就会变得怪怪的,不是Observable开头了,变成函数开头。
composeObservable(Observable.interval(1,TimeUnit.SECONDS)).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
}
});
这个问题用compose就可以完美解决:
Observable.interval(1, TimeUnit.SECONDS).
compose(bindUntil(5)).
subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Timber.tag(TAG).d("running num : " + num);
}
});
private ObservableTransformer<Long, Long> bindUntil(final long deleyTime) {
return new ObservableTransformer<Long, Long>() {
@Override
public ObservableSource<Long> apply(Observable<Long> upstream) {
return upstream.subscribeOn(Schedulers.io()).takeUntil(Observable.timer(deleyTime, TimeUnit.SECONDS));
}
};
}
5.总结
上面的内容是假定大家有一点点RxJava的知识的,没有涉及到基本的使用。本次分享可能看起来毫无章法哈,其实还是有针对目的的,就是前面提到的开源框架RxLifecycle
,这次分享就是针对里面用到的RxJava的一些知识点进行解析。RxJava的操作符挺多的,也不太可能也没必要一个个进行分析,用到的时候进行查找官方文档就可以了。
下面会用前面提到的这些知识点来自己实现一个类似于RxLifecycle
的小Demo,欢迎大家关注和点赞哈。
最后感谢@右倾倾的理解和支持哈。
以上!
欢迎关注公众号:JueCode