Rxjava操作符大全
RxJava操作符图谱
![](https://img.haomeiwen.com/i10339856/d7820326a140706c.jpg)
创建操作符
create
完整创建1个被观察者对象(Observable)
just
- 快速创建1个被观察者对象(Observable)
- 发送事件的特点:直接发送 传入的事件
快速创建 被观察者对象(Observable) & 发送10个以下事件
from
fromeArray
- 快速创建1个被观察者对象(Observable)
- 发送事件的特点:直接发送 传入的数组数据
将数组元素一次发射出,可以用来遍历数组
fromIterable
- 快速创建1个被观察者对象(Observable)
- 发送事件的特点:直接发送 传入的集合List数据
同上,可用来遍历集合
发送事件
下列方法一般用于测试使用
<-- empty() -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
Observable observable1=Observable.empty();
// 即观察者接收后会直接调用onCompleted()
<-- error() -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
// 可自定义异常
Observable observable2=Observable.error(new RuntimeException())
// 即观察者接收后会直接调用onError()
<-- never() -->
// 该方法创建的被观察者对象发送事件的特点:不发送任何事件
Observable observable3=Observable.never();
// 即观察者接收后什么都不调用
延时操作符
- 定时操作:在经过了x秒后,需要自动执行y操作
- 周期性操作:每隔x秒后,需要自动执行y操作
delay
使得被观察者延迟一段时间再发送事件
// 1. 指定延迟时间
// 参数1 = 时间;参数2 = 时间单位
delay(long delay,TimeUnit unit)
// 2. 指定延迟时间 & 调度器
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器
delay(long delay,TimeUnit unit,mScheduler scheduler)
// 3. 指定延迟时间 & 错误延迟
// 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常
// 参数1 = 时间;参数2 = 时间单位;参数3 = 错误延迟参数
delay(long delay,TimeUnit unit,boolean delayError)
// 4. 指定延迟时间 & 调度器 & 错误延迟
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器;参数4 = 错误延迟参数
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟
defer
直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
- 通过 Observable工厂方法创建被观察者对象(Observable)
- 每次订阅后,都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的
<-- 1. 第1次对i赋值 ->>
Integer i = 10;
// 2. 通过defer 定义被观察者对象
// 注:此时被观察者对象还没创建
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
<-- 2. 第2次对i赋值 ->>
i = 15;
<-- 3. 观察者开始订阅 ->>
// 注:此时,才会调用defer()创建被观察者对象(Observable)
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到的整数是"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
timer
- 快速创建1个被观察者对象(Observable)
- 发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)
timer操作符默认运行在一个新线程上
也可自定义线程调度器(第3个参数):timer(long, TimeUnit, Scheduler)
interval
-
快速创建1个被观察者对象(Observable)
-
发送事件的特点:每隔指定时间就发送事件
/** * initialDelay 初始延时时间 * unit 时间单位 * period 间隔时间 * scheduler 线程调度器 */ public static Observable<Long> interval(long interval, TimeUnit unit) { return interval(interval, interval, unit, Schedulers.computation()); } public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler scheduler) { return interval(interval, interval, unit, scheduler); } public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) { return interval(initialDelay, period, unit, Schedulers.computation()); }
range/rangeLong
-
快速创建1个被观察者对象(Observable)
-
发送事件的特点:连续发送1个事件序列,可指定范围
/** * start 起始数字 * count 数量 */ public static Observable<Integer> range(int start, int count)
过滤操作符
-
take, takeFirst, takeLast
-
skip, skipFirst, skipLast
-
first
-
last
-
firstOrDefault, lastOrDefault (只发射最后一项(或者满足某个条件的最后一项)数据,可以指定默认值。)
// 跳过前面几项 public final Observable<T> skip(int count) // 跳过前面的时间,之后产生的数据提交 public final Observable<T> skip(long time, TimeUnit unit) // skipLast和skip相反,跳过后面的几项。 // 忽略最后时间单位内产生的数据 skipLast(long time,TimeUnit) // 并不是娶第n个,而是取前面n个数据 take(n) // 是在制定时间内取数据,如果超过了这个时间源Observable产生的数据将作废 take(long time, TimeUnit unit)
takeFirst操作符和first操作符类似,取满足条件的第一个
区别:first取不到要抛异常,takeFirst不会takeLast操作符与last操作符相似。区别在于,如果取不到满足条件的值,last将抛出异常
filter
过滤数据,不满足条件的数据将被过滤不发射。
filter(Fun) 自定义过滤条件
ofType
过滤指定类型的数据
Observable.just(1,2,"3")
.ofType(Integer.class)
.subscribe(item -> Log.d("JG",item.toString()));
elementAt/elementAtOrDefault/elementAtOrError
发射某一项数据,如果超过了范围可以指定默认值。内部通过OperatorElementAt过滤。
Observable.just(3,4,5,6)
.elementAt(2)
.subscribe(item->Log.d("JG",item.toString())); //5
firstElement/lastElement
仅选取第1个元素 / 最后一个元素
ignoreElements
丢弃所有数据,只发射错误或正常终止的通知。内部通过OperatorIgnoreElements实现。
distinct
过滤重复数据,内部通过OperatorDistinct实现。
distinctUntilChanged
过滤掉连续重复的数据。内部通过OperatorDistinctUntilChanged实现
Observable.just(3,4,5,6,3,3,4,9)
.distinctUntilChanged()
.subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,3,4,9
timeout
如果原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable。
Debounce/throtleWithTimeout
根据你指定的时间间隔进行限流
发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据
条件操作符
single/singleOrDefault
检测源Observable产生的数据项是否只有一个,否则报错
onError()
java.lang.IllegalArgumentException: Sequence contains too many elements
all
all操作符接收一个函数参数,创建并返回一个单布尔值的Observable,
如果原Observable正常终止并且每一项数据都满足条件,就返回true,
如果原Observable的任何一项数据不满足条件或者非正常终止就返回False。
判断所有的数据项是否满足某个条件,内部通过OperatorAll实现。
amb/ambWith
amb操作符对于给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据。
当你传递多个Observable给amb操作符时,该操作符只发射其中一个Observable的数据和通知:首先发送通知给amb操作符的的那个Observable,不管发射的是一项数据还是一个onError或onCompleted通知,amb将忽略和丢弃其它所有Observables的发射物。
amb(T o1, T ... o2)(可接受2到9个参数)
给定多个Observable,只让第一个发射数据的Observable发射全部数据,其他Observable将会被忽略。
contains
contains操作符将接收一个特定的值作为一个参数,判定原Observable是否发射该值,若已发射,则创建并返回的Observable将发射true,否则发射false。
判断在发射的所有数据项中是否包含指定的数据,内部调用的其实是exists
contains操作符默认不在任何特定的调度器上执行。
可用来判断Observable发射的值中是否包含该值。
exists
exists操作符类似与contains操作符,不同的是,其接受一个函数参数,在函数中,对原Observable发射的数据,设定比对条件并做判断。若任何一项满足条件就创建并返回一个发射true的Observable,否则返回一个发射false的Observable。
该操作符默认不在任何特定的调度器上执行。
判断是否存在数据项满足某个条件。内部通过OperatorAny实现。
isEmpty
isEmpty操作符用于判定原始Observable是否没有发射任何数据。若原Observable未发射任何数据,创建创建并返回一个发射true的Observable,否则返回一个发射false的Observable。
isEmpty操作符默认不在任何特定的调度器上执行。
可以用来判断是否没有数据发射。
defaultIfEmpty
defaultIfEmpty操作接受一个备用数据,在原Observable没有发射任何数据正常终止(以onCompletedd的形式),该操作符以备用数据创建一个Observable并将数据发射出去。
RxJava将这个操作符实现为defaultIfEmpty。它默认不在任何特定的调度器上执行。
switchIfEmpty
如果原始Observable正常终止后仍然没有发射任何数据,就使用备用的Observable。
如果原始Observable正常终止后仍然没有发射任何数据
defaultIfEmpty使用默认值发射,switchIfEmpty使用默认Observable发射
sequenceEqual
sequenceEqual(Observable,Observable,Func2)变体接收两个Observable参数和一个函数参数,在函数参数中,可以比较两个参数是否相同。
该操作符默认不在任何特定的调度器上执行。
用于判断两个Observable发射的数据是否相同(数据,发射顺序,终止状态)
skipUntil
skipUntil操作符在观察者订阅原Observable时,该操作符将是忽略原Observable的发射的数据,直到第二个Observable发射了一项数据那一刻,它才 开始发射原Observable发射的数据。
该操作符默认不在任何特定的调度器上执行。
skipWhile
skipWhile操作符丢弃原Observable发射的数据,直到发射的数据不满足一个指定的条件,才开始发射原Observable发射的数据。
在观察者订阅原Observable时,skipWhile操作符将忽略原Observable的发射物,直到你指定的某个条件变为false时,它开始发射原Observable发射的数据。
skipWhile操作符默认不在任何特定的调度器上执行。
takeUntil
takeUntil操作符与skipUntil操作符作用相反,当第二个Observable发射了一项数据或者终止时,丢弃原Observable发射的任何数据。
takeUntil(Func1)变体接受一个函数参数,当满足条件时终止发射数据。
takeWhile
takeWhile操作符与skipWhile操作符作用相反。在观察者订阅原Observable时,takeWhile创建并返回原Oservable的镜像Observable,暂命名为_observable,发射原Observable发射的数据。当你指定的某个条件变为false时,_observable发射onCompleted终止通知。
takeWhile操作符默认不在任何特定的调度器上执行。
变换操作符
map
对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件
即,将被观察者发送的事件转换为任意的类型事件。
如果是list,可对list的每个元素进行类型转换,最后tolist发射转换后的list。
flatmap
对Observable发射的数据都应用(apply)一个函数,这个函数返回一个Observable,然后合并这些Observables,并且发送(emit)合并的结果。 flatMap和map操作符很相像,flatMap发送的是合并后的Observables,map操作符发送的是应用函数后返回的结果集
将原Observable发射的每个数据转换为新的Observable,发射每一个转换的Observable
新合并生成的事件序列顺序是无序的,即与旧序列发送事件的顺序无关
concatMap
作用同flatMap
与flatMap的区别是,新合并生成的事件序列顺序是有序的
switchMap
当源Observable发射一个新的数据项时,如果旧数据项订阅还未完成,就取消旧订阅数据和停止监视那个数据项产生的Observable,开始监视新的数据项.
cast
cast操作符将原始Observable发射的每一项数据都强制转换为一个指定的类型,然后再发射数据,它是map的一个特殊版本
所相互转换的类之间需要存在某种关系,如继承、实现
concat
组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
按发送顺序串行执行
merge
组合多个被观察者一起发送数据,合并后 按时间线并行执行
区别上述concat()操作符:同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行
并行执行
zip
合并多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送
事件组合方式 = 严格按照原先事件序列 进行对位合并
最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
reduce
把被观察者需要发送的事件聚合成1个事件 & 发送
聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推
自定义聚合条件,前2个数据聚合得到结果与第三个数据再聚合。以此类推...
collect
将被观察者Observable发送的数据事件收集到一个数据结构里
Observable.just(1, 2, 3, 4)
.collect(new Func0<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() {
//创建收集容器
return new ArrayList<>();
}
}, new Action2<ArrayList<Integer>, Integer>() {
@Override
public void call(ArrayList<Integer> list1, Integer integer) {
//开始收集每一项数据
list1.add(integer);
}
}).subscribe(new Action1<ArrayList<Integer>>() {
@Override
public void call(ArrayList<Integer> integers) {
//得到收集后的数据
}
});
startWith
在一个被观察者发送事件前,追加发送一些数据或是一个新的被观察者
//源码是通过concat实现,在前面追加一个Observable
public final Observable<T> startWith(Observable<T> values) {
return concat(values, this);
}
compose
其他操作符
retry
重试,即当出现错误时,让被观察者(Observable)重新发射数据
retryUntil
出现错误后,判断是否需要重新发送数据
retryWhen
遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件
repeat
无条件地、重复发送 被观察者事件
具备重载方法,可设置重复创建次数
repeatWhen
有条件地、重复发送 被观察者事件
count
统计被观察者发送事件的数量