Android拾萃 - RxJava2操作符汇总
几种主要的需求:
- 直接创建一个Observable(创建操作)
- 组合多个Observable(组合操作)
- 对Observable发射的数据执行变换操作(变换操作)
- 从Observable发射的数据中取特定的值(过滤操作)
- 转发Observable的部分值(条件/布尔/过滤操作)
- 对Observable发射的数据序列求值(算术/聚合操作)
创建操作符
名称 | 解析 |
---|---|
just() | 将一个或多个对象转换成发射这个或这些对象的一个Observable |
fromArray() | 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable |
repeat() | 创建一个重复发射指定数据或数据序列的Observable |
repeatWhen() | 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据 |
create() | 使用一个函数从头创建一个Observable |
defer() | 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable |
range() | 创建一个发射指定范围的整数序列的Observable |
interval() | 创建一个按照给定的时间间隔发射整数序列的Observable |
timer() | 创建一个在给定的延时之后发射单个数据的Observable |
empty() | 创建一个什么都不做直接通知完成的Observable |
error() | 创建一个什么都不做直接通知错误的Observable |
never() | 创建一个入不发射任何数据的Observable |
变换操作符
操作符 | 解析 |
---|---|
buffer() | 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个 |
map() | 对序列的每一项都应用一个函数来变换Observable发射的数据序列 |
flatMap() , concatMap() , flatMapIterable() | 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable |
switchMap() | 将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据 |
scan() | 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值 |
groupBy() | 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据 |
buffer() | 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个 |
window() | 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项 |
cast() | 在发射之前强制将Observable发射的所有数据转换为指定类型 |
过滤操作符列表
方法 | 含义 |
---|---|
filter() | 过滤数据 |
takeLast() | 只发射最后的N项数据 |
last() | 只发射最后的一项数据 |
lastOrDefault() | 只发射最后的一项数据,如果Observable为空就发射默认值 |
takeLastBuffer() | 将最后的N项数据当做单个数据发射 |
skip() | 跳过开始的N项数据 |
skipLast() | 跳过最后的N项数据 |
take() | 只发射开始的N项数据 |
first() , takeFirst() | 只发射第一项数据,或者满足某种条件的第一项数据 |
firstOrDefault() | 只发射第一项数据,如果Observable为空就发射默认值 |
elementAt() | 发射第N项数据 |
elementAtOrDefault() | 发射第N项数据,如果Observable数据少于N项就发射默认值 |
sample() , throttleLast() | 定期发射Observable最近的数据 |
throttleFirst() | 定期发射Observable发射的第一项数据 |
throttleWithTimeout() , debounce() | 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作 |
timeout() | 如果在一个指定的时间段后还没发射数据,就发射一个异常 |
distinct() | 过滤掉重复数据 |
distinctUntilChanged() | 过滤掉连续重复的数据 |
ofType() | 只发射指定类型的数据 |
ignoreElements() | 丢弃所有的正常数据,只发射错误或完成通知 |
结合操作符
操作符 | 解析 |
---|---|
and() , then() , when() | 通过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集 |
combineLatest() | 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果 |
join() , groupJoin() | 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射 |
merge() | 将多个Observable合并为一个 |
startWith() | 在数据序列的开头增加一项数据,在发射原来的Observable的数据序列之前,先发射一个指定的数据序列或数据项 |
switch() | 将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据 |
switchOnNext() | 将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据 |
zip() | 打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射 |
mergeDelayError() | 合并多个Observables,让没有错误的Observable都完成后再发射错误通知 |
错误处理操作符
用于对Observable发射的 onError 通知做出响应或者从错误中恢复,例如,你
可以:
- 吞掉这个错误,切换到一个备用的Observable继续发射数据
- 吞掉这个错误然后发射默认值
- 吞掉这个错误并立即尝试重启这个Observable
- 吞掉这个错误,在一些回退间隔后重启这个Observable
名称 | 解析 |
---|---|
onErrorResumeNext() | 指示Observable在遇到错误时发射一个数据序列 |
onErrorReturn() | 让Observable遇到错误时发射一个特殊的项并且正常终止。方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者的onError调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的onCompleted方法。 |
onExceptionResumeNext() | 指示Observable遇到错误时继续发射数据 |
retry() | 指示Observable遇到错误时重试 |
retryWhen() | 指示Observable遇到错误时,将错误传递给另一个Observable来决定是否要重新给订阅这个Observable |
retryUntil() | 指示Observable遇到错误时,是否让Observable重新订阅 |
辅助操作符
用于处理Observable的操作符,例如延迟、定时等。
名称 | 解析 |
---|---|
materialize() | 将Observable转换成一个通知列表 |
dematerialize() | 将上面的结果逆转回一个Observable |
timestamp() | 给Observable发射的每个数据项添加一个时间戳 |
serialize() | 强制Observable按次序发射数据并且要求功能是完好的 |
cache() | 记住Observable发射的数据序列并发射相同的数据序列给后续的订阅者 |
observeOn() | 指定观察者观察Observable的调度器 |
subscribeOn() | 指定Observable执行任务的调度器 |
doOnEach() | 注册一个动作,对Observable发射的每个数据项使用 |
doOnCompleted() | 注册一个动作,对正常完成的Observable使用 |
doOnError() | 注册一个动作,对发生错误的Observable使用 |
doOnTerminate() | Observable终止之前会被调用,无论是正常还是异常终止 |
doOnSubscribe() | 注册一个动作,在观察者订阅时使用 |
doOnUnsubscribe() | 注册一个动作,在观察者取消订阅时使用 |
doOnNext() | 在onNext前执行 |
doAfterNext() | 在onNext之后执行 |
doAfterTerminate | 终止发送时候调用 |
doOnLifecycle | 可以在订阅之后 设置是否取消订阅 |
doFinally | 在最后执行 |
finallyDo() | 注册一个动作,在Observable完成时使用 |
delay() | 延时发射Observable的结果 |
delaySubscription() | 延时处理订阅请求 |
timeInterval() | 转换获取数据发送的时间间隔 |
using() | 创建一个只在Observable生命周期存在的资源 |
single() | 强制返回单个数据,否则抛出异常 |
singleOrDefault() | 如果Observable完成时返回了单个数据,就返回它,否则返回默认数据 |
toFuture(),toIterable(),toList() | 将Observable转换为其它对象或数据结构 |
条件操作符
根据条件发射或变换Observables
名称 | 解析 |
---|---|
amb() | 给定多个Observable,只让第一个发射数据的Observable发射全部数据 |
defaultIfEmpty() | 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据 |
switchIfEmpty() | 如果原始Observable没有发射数据,它发射一个备用Observable的发射物 |
(rxjava-computation-expressions) doWhile() | 发射原始Observable的数据序列,然后重复发射这个序列直到不满足这个条件为止 |
(rxjava-computation-expressions) ifThen() | 只有当某个条件为真时才发射原始Observable的数据序列,否则发射一个空的或默认的序列 |
skipUntil() | 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据 |
skipWhile() | 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据 |
(rxjava-computation-expressions) switchCase() | 基于一个计算结果,发射一个指定Observable的数据序列 |
takeUntil() | 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知 |
takeWhile(),takeWhileWithIndex() | 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据 |
(rxjava-computation-expressions)whileDo() | 如果满足一个条件,发射原始Observable的数据,然后重复发射直到不满足这个条件为止 |
(rxjava-computation-expressions): 表示这个操作符当前是可选包 rxjava-
computation-expressions 的一部分,还没有包含在标准RxJava的操作符集合里,需要自己导包,但是这个包用的是Rxjava1.0.0-rc3的版本,所以在Rxjava2.0用的话,会出现问题。 可以使用repeat操作符代替
compile 'io.reactivex:rxjava-computation-expressions:0.21.0'
布尔操作符列表
对原始数据发射源进行布尔操作,经过布尔操作之后,接收者就是观察者接收到的数据是布尔值。
名称 | 解析 |
---|---|
all() | 判断是否所有的数据项都满足某个条件 |
contains() | 判断Observable是否会发射一个指定的值 |
isEmpty() | 判断Observable是否发射了一个值 |
sequenceEqual() | 判断两个Observables发射的序列是否相等 |
算术操作符
这个模块需要导下面这个包,不支持Rxjava2,所以Rxjava2用不了噢:
compile 'io.reactivex:rxjava-math:1.0.0'
名称 | 解析 |
---|---|
averageInteger() | 求序列平均数并发射 |
averageLong() | 求序列平均数并发射 |
averageFloat() | 求序列平均数并发射 |
averageDouble() | 求序列平均数并发射 |
max() | 求序列最大值并发射 |
maxBy() | 求最大key对应的值并发射 |
min() | 求最小值并发射 |
minBy() | 求最小Key对应的值并发射 |
sumInteger() | 求和并发射 |
sumLong() | 求和并发射 |
sumFloat() | 求和并发射 |
sumDouble() | 求和并发射 |
聚合操作符:
名称 | 解析 |
---|---|
concat() / concatArray | 顺序连接多个Observables |
count() | 计算数据项的个数并发射结果 |
reduce() | 按顺序对Observable发射的每项数据应用一个函数并发射最终的值 |
collect() | 将原始Observable发射的数据放到一个单一的可变的数据结构中,然后返回一个发射这个数据结构的Observable |
toList() | 收集原始Observable发射的所有数据到一个列表,然后返回这个列表 |
toSortedList() | 收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表 |
toMap() | 将序列数据转换为一个Map,Map的key是根据一个函数计算的 |
toMultiMap() | 将序列数据转换为一个列表,同时也是一个Map,Map的key是根据一个函数计算的 |
异步操作符
异步操作符属于单独的rxjava-async模块,它们用于将同步对象转换为Observable。不支持Rxjava2.0,如果使用Rxjava1.0的话,可以导入下面的包就可以使用异步操作符了。
compile 'io.reactivex:rxjava-async-util:0.21.0'
名称 | 解析 |
---|---|
start() | 创建一个Observable,它发射一个函数的返回值 |
toAsync() / asyncAction()/ asyncFunc() | 将一个函数或者Action转换为已Observable,它执行这个函数并发射函数的返回值 |
startFuture() | 将一个返回Future的函数转换为一个Observable,它发射Future的返回值 |
deferFuture() | 将一个返回Observable的Future转换为一个Observable,但是并不尝试获取这个Future返回的Observable,直到有订阅者订阅它 |
forEachFuture() | 传递Subscriber方法给一个Subscriber,但是同时表现得像一个Future一样阻塞直到它完成 |
fromAction() | 将一个Action转换为Observable,当一个订阅者订阅时,它执行这个action并发射它的返回值 |
fromCallable() | 将一个Callable转换为Observable,当一个订阅者订阅时,它执行这个Callable并发射Callable的返回值,或者发射异常 |
fromRunnable() | 将一个Runnable转换为Observable,当一个订阅者订阅时,它执行这个Runnable并发射Runnable的返回值 |
runAsync() | 返回一个StoppableObservable,它发射某个Scheduler上指定的Action生成的多个actions |
ps: 由于这些操作必须等待数据发射完成(通常也必须缓存这些数据),它们对于非常长或者无限的序列来说是危险的,不推荐使用。
连接操作符
一个可连接的Observable与普通的Observable差不多,除了这一点:可连接的Observabe在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始。用这种方法,你可以等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。
名称 | 解析 |
---|---|
ConnectableObservable.connect() | 指示一个可连接的Observable开始发射数据 |
Observable.publish() | 将一个Observable转换为一个可连接的Observable |
Observable.replay() | 确保所有的订阅者看到相同的数据序列,即使它们在Observable开始发射数据之后才订阅 |
ConnectableObservable.refCount() | 让一个可连接的Observable表现得像一个普通的Observable |
阻塞操作符列表
在Rxjava1中的BlockingObservable已经在Rxjava2中去掉了,在Rxjava2中已经集成到了Observable中。
官方说明:
这里写图片描述https://github.com/ReactiveX/RxJava/wiki/What’s-different-in-2.0
BlockingObservable的不同可以看这里:
这里写图片描述http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html
名称 | 解析 |
---|---|
blockingForEach() | 对Observable发射的每一项数据调用一个方法,会阻塞直到Observable完成 |
blockingFirst() | 阻塞直到Observable发射了一个数据,然后返回第一项数据 |
blockingMostRecent() | 返回一个总是返回Observable最近发射的数据的iterable |
blockingLatest() | 返回一个iterable,会阻塞直到或者除非Observable发射了一个iterable没有返回的值,然后返回这个值 |
blockingNext() | 返回一个iterable,阻塞直到返回另外一个值 |
blockingLast() | 阻塞直到Observable终止,然后返回最后一项数据 |
blockingIterable() | 将Observable转换返回一个iterable. |
blockingSingle() | 如果Observable终止时只发射了一个值,返回那个值,否则抛出异常 |
blockingSubscribe() | 在当前线程订阅,和forEach类似 |