RxJava 变换操作符
buffer
间隔固定个数缓存
public final <U extends Collection<? super T>> Observable<U> buffer(int count, int skip, Callable<U> bufferSupplier)
// buffer(count, count, bufferSupplier)
public final <U extends Collection<? super T>> Observable<U> buffer(int count, Callable<U> bufferSupplier)
// buffer(count, skip, ArrayListSupplier.<T>asCallable()) 第三个参数的 call 回调返回一个空列表
public final Observable<List<T>> buffer(int count, int skip)
// buffer(count, count)
public final Observable<List<T>> buffer(int count)
按照规定大小缓存,每次取 count 个数,取完一次跳过 skip 个数,将每次取的数据合并到一个列表里。
Observable.just(1,2,3,4,5,6,7,8)
.buffer(4, 2)
.subscribe({Log.e("RX", "onNext $t")})
看下日志:
onNext [1, 2, 3, 4]
onNext [3, 4, 5, 6]
onNext [5, 6, 7, 8]
onNext [7, 8]
每次取出 4 个数缓存,然后跳过 2 个数再缓存并且发射这一次的列表。
val list = mutableListOf<Int>()
Observable.just(1,2,3,4,5,6,7,8)
.buffer(4, 2, { list })
.subscribe(observer)
日志:
onNext [1, 2, 3, 3]
onNext [1, 2, 3, 3, 4]
onNext [1, 2, 3, 3, 4, 5]
onNext [1, 2, 3, 3, 4, 5, 7]
这就奇怪了,与想象不同,看两个参数的 buffer 方法,也是创建了 ArrayListSupplier.<T>asCallable() 调用三个参数的方法,最后发现两者的不同在于 ArrayListSupplier.<T>asCallable() 里的 call() 每次调用都 new 了一个新的 ArrayList,而我上面的代码用的是同一个 ArrayList。
将上述代码修改成
Observable.just(1,2,3,4,5,6,7,8)
.buffer(4, 2, { mutableListOf<Int>() })
.subscribe(observer)
此时看日志就和想象中的一样了。
源码分析
进源码发现,当 count 和 skip 不相等时,会先创建 BufferSkipObserver 来接收 Observable 发射的数据,也就相当于先拦截下,在这个 Observer 里做缓存,然后将缓存后的数据发送给用户订阅的那个 Observer,所以主要逻辑看 BufferSkipObserver 的 onNext 方法。
@Override
public void onNext(T t) {
if (index++ % skip == 0) {
U b;
try {
b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
} catch (Throwable e) {
buffers.clear();
s.dispose();
actual.onError(e);
return;
}
buffers.offer(b);
}
Iterator<U> it = buffers.iterator();
while (it.hasNext()) {
U b = it.next();
b.add(t);
if (count <= b.size()) {
it.remove();
actual.onNext(b);
}
}
}
buffers 是 ArrayDeque<U>
,它是一个双端队列,内部存储的 U 是一个集合,就是我们调 buffer 方法第三个参数的 call 方法提供的集合。
首先在这个例子里,发射的数据是 1-8,索引也就是源码里的 index 是 0-7,skip 是 2,这样 index % skip 的结果就分别是 0,1,0,1,0,1,0,1
,而我们看到,当这个取余结果为 0 的时候,就通过 Callable 的 call 方法返回一个列表,并且把它加入到 buffers 这个队列的末尾。
-
首先数据 1,会取出一个 ArrayList 加入 buffers,然后取 buffers 的迭代器,因为只有一个元素,循环一次,取出里面的 list,把 1 添加进去,结果是这样的
buffer1.png -
然后数据 2,index % skip 不为 0,不会向 buffers 里添加,迭代取出里面的 list,添加 2,此时是
buffer2.png -
数据 3,会又通过 bufferSupplier.call() 取出一个 list,而我代码不是创建一个新的 list,而是依然用原来的 list,这样 buffers 先变成了
buffer3.png就是 buffers 里有了两个值,都是指向同一个 list 的指针,然后遍历 buffers,第一次遍历取出的是 list,添加了 3,第二次遍历取出的还是这个 list,又要添加 3,因此这个 list 里面有了两个 3
buffer4.png在第二次遍历中,此时 list 的长度为 4,等于 count,于是会删除 buffers 这个队列的第一个元素,并把这个 list 的内容发射出去,于是自己定义的观察者就先收到了
1,2,3,3
。结果变成了这样
buffer5.png -
数据 4,索引 3 和 skip 取余不为 0,直接在 list 上加了 4,并且列表长度为 5,大于 count,buffers 删除头部元素并发射出去,此时 buffers 里面空了,什么都没有
buffer6.png -
数据 5,又在 buffers 上加个一个元素,也是指向 list 的指针,由于 list 用的同一个,所以每次添加数据都会立刻大于 count,立刻发射数据,所以 1,2,3,3,4,5 也被发射了出去,并且 buffers 刚添加了又被 remove 掉,里面又空了
-
再看数据 6,index % skip 为 1,所以 buffers 不会添加数据,而上一次 buffers 已经空了,所以这次迭代后发现 hasNext 是 false,于是 6 这个数据就失踪了
-
到了 7,才会 buffers 中又添加一个指向 list 的指针,并添加了 7 发射出去,于是最终 buffers 里是空的,外面的观察者收到了 1,2,3,3,4,5,7
间隔固定时间缓存
public final Observable<List<T>> buffer(long timespan, long timeskip, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler)
// 上面两个方法调用这个方法
public final <U extends Collection<? super T>> Observable<U> buffer(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Callable<U> bufferSupplier) {
...
return RxJavaPlugins.onAssembly(new ObservableBufferTimed<T, U>(this, timespan, timeskip, unit, scheduler, bufferSupplier, Integer.MAX_VALUE, false));
}
public final Observable<List<T>> buffer(long timespan, TimeUnit unit) {
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count)
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler)
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count)
// 上面四个方法最后都调用这个方法
public final <U extends Collection<? super T>> Observable<U> buffer(
long timespan, TimeUnit unit,
Scheduler scheduler, int count,
Callable<U> bufferSupplier,
boolean restartTimerOnMaxSize) {
...
return RxJavaPlugins.onAssembly(new ObservableBufferTimed<T, U>(this, timespan, timespan, unit, scheduler, bufferSupplier, count, restartTimerOnMaxSize));
}
最后都是内部创建 ObservableBufferTimed 来实现。
Observable.just(1,2,3,4,5,6,7,8)
.map {
Thread.sleep(100)
it
}
.buffer(150, 200, TimeUnit.MILLISECONDS)
.subscribe({Log.e("RX", "onNext $t")})
每隔 150ms 取一次,跳过 200ms,包含那 150ms 在内,结果是:
onNext [1]
onNext [2, 3]
onNext [4, 5]
onNext [6, 7]
onNext [8]
window
和 buffer 类似,但不是变成一个个列表发射,而是多个 Observable,每个 Observable 发射一个子集。
public final Observable<Observable<T>> window(long count)
public final Observable<Observable<T>> window(long count, long skip)
public final Observable<Observable<T>> window(long count, long skip, int bufferSize)
public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit)
public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler)
public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, int bufferSize)
public final Observable<Observable<T>> window(long timespan, TimeUnit unit)
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, long count)
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, long count, boolean restart)
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler)
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler, long count)
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler, long count, boolean restart)
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler, long count, boolean restart, int bufferSize)
public final <B> Observable<Observable<T>> window(ObservableSource<B> boundary)
public final <B> Observable<Observable<T>> window(ObservableSource<B> boundary, int bufferSize)
public final <B> Observable<Observable<T>> window(Callable<? extends ObservableSource<B>> boundary)
public final <B> Observable<Observable<T>> window(Callable<? extends ObservableSource<B>> boundary, int bufferSize)
public final <U, V> Observable<Observable<T>> window(ObservableSource<U> openingIndicator, Function<? super U, ? extends ObservableSource<V>> closingIndicator)
public final <U, V> Observable<Observable<T>> window(ObservableSource<U> openingIndicator, Function<? super U, ? extends ObservableSource<V>> closingIndicator, int bufferSize)
val ob = Observable
.intervalRange(0,10,0,100,TimeUnit.MILLISECONDS)
.take(10)
// 一个Observable开始发射数据
// 0
// 1
// 一个Observable开始发射数据
// 2
// 2
// 3
// 一个Observable开始发射数据
// 4
// 4
// 5
// 一个Observable开始发射数据
// 6
// 6
// 7
// 一个Observable开始发射数据
// 8
// 8
// 9
ob.window(3,2).subscribe {
it.doOnSubscribe { Log.e("RX", "一个Observable开始发射数据") }
.subscribe { Log.e("RX", "$it") }
}
// 一个Observable开始发射数据
// 0
// 1
// 2
// 一个Observable开始发射数据
// 4
// 5
// 6
// 7
// 一个Observable开始发射数据
// 8
// 9
ob.window(300,400, TimeUnit.MILLISECONDS).subscribe {
it.doOnSubscribe { Log.e("RX", "一个Observable开始发射数据") }
.subscribe { Log.e("RX", "$it") }
}
// 参数的 Observable 发射前的数据
// 0
// 1
// 2
ob.window(Observable.timer(300, TimeUnit.MILLISECONDS)).subscribe {
it.subscribe { Log.e("RX", "$it") }
}
// 屁用没有
ob.window(Observable.timer(300, TimeUnit.MILLISECONDS)
, Function<Long, Observable<Long>>{ Observable.timer(200, TimeUnit.MILLISECONDS)} )
.subscribe {
it.subscribe { Log.e("RX", "$it") }
}
blockingIterable
返回一个迭代器
val ob = Observable.just(1,2,3,4,5,6)
val iterable = ob.blockingIterable()
iterable.forEach {Log.e("RX", "$it")}
blockingForEach
阻塞直到 forEach 都执行完毕了,才会发射给观察者。
val ob = Observable.just(1,2,3,4,5,6)
ob.blockingForEach {
Thread.sleep(1000)
Log.e("RX", "$it")
}
ob.subscribe({ Log.e("RX", "收到 $it") })
05-12 23:33:40.729 3651-3651/pot.ner347.androiddemo E/RX: 1
05-12 23:33:41.730 3651-3651/pot.ner347.androiddemo E/RX: 2
05-12 23:33:42.731 3651-3651/pot.ner347.androiddemo E/RX: 3
05-12 23:33:43.732 3651-3651/pot.ner347.androiddemo E/RX: 4
05-12 23:33:44.733 3651-3651/pot.ner347.androiddemo E/RX: 5
05-12 23:33:45.735 3651-3651/pot.ner347.androiddemo E/RX: 6
05-12 23:33:45.755 3651-3651/pot.ner347.androiddemo E/RX: 收到 1
收到 2
收到 3
收到 4
收到 5
收到 6
blockingLatest
最近发射的数据加入到 Iterator 并返回。
险些没把人搞死,几个小时,各种方法试了,得到的 Iterator 一直是空的,注释说的又不清不楚的。太浪费时间了,感觉不值,但又不想漏过去。
val source = Observable.interval(1, TimeUnit.MICROSECONDS).take(1000)
val iterable = source.blockingLatest()
for (i in iterable) {
Log.e("RX", "$i")
}
1 秒中发射一次,而 iterable 依次往后迭代。
val ob = Observable.create(ObservableOnSubscribe<Int> { emitter ->
(1..100).forEach {
emitter.onNext(it)
Log.e("RX", "onNext $it")
Thread.sleep(100)
}
})
val iterator = ob.blockingLatest()
for (i in iterator) {
Log.e("RX", "$i")
}
可为毛上面的代码就不行呢,iterator 里只有最后一个 100,真是气死了。那 interval 和我这 create 有什么区别,不都是 1 秒发一次嘛,看 interval 源码,看到里面是创建了 worker 线程,然后定时执行的,所以 create 这个也放到子线程去,居然可以了
val ob = Observable.create(ObservableOnSubscribe<Int> { emitter ->
(1..100).forEach {
emitter.onNext(it)
Log.e("RX", "onNext $it")
Thread.sleep(100)
}
}).subscribeOn(Schedulers.newThread())
val iterator = ob.blockingLatest()
for (i in iterator) {
Log.e("RX", "$i")
}
blockingMostRecent
val ob = Observable.create(ObservableOnSubscribe<Int> { emitter ->
(1..20).forEach {
emitter.onNext(it)
Log.e("RX", "onNext $it")
Thread.sleep(100)
}
}).subscribeOn(Schedulers.newThread())
val iterator = ob.blockingMostRecent(-10)
for (i in iterator) {
Log.e("RX", "$i")
}
从日志看,blockingLatest 迭代器每次取出的是最近发射的那一个,而 blockingMostRecent 返回同样的数据多次,比如
...
onNext 1
1
1
1
1
1
1
1
1
1
1
1
2
2
3
onNext 3
3
3
4
onNext 4
4
4
onNext 5
5
5
6
6
onNext 6
6
6
...
它会重新确认最近的值,即使已经确认过,直到下一个值被发射才停止。如果没有值发射,会用参数的默认值。
blockingNext
返回一个阻塞的 Iterator,直到发射一个数据才返回这个数据。
val ob = Observable.create(ObservableOnSubscribe<Int> { emitter ->
(1..20).forEach {
emitter.onNext(it)
Log.e("RX", "onNext $it")
Thread.sleep(100)
}
}).subscribeOn(Schedulers.newThread())
val iterator = ob.blockingNext()
for (i in iterator) {
Log.e("RX", "$i")
}
blockingSubscribe
val ob = Observable.create(ObservableOnSubscribe<Int> {
it.onNext(1)
it.onNext(2)
// it.onNext(1/0)
it.onNext(3)
it.onComplete()
})
ob.blockingSubscribe( {
Log.e("RX", "$it")
}, {Log.e("RX", "exception")}, {Log.e("RX", "complete")})
搞出这些方法到底有什么用吗?当前线程给出回调,知道执行了什么吗?又能怎样?
还有个无参的构造方法:
ignoring any values and rethrowing any exception
这意思是忽略掉发射的数据,只抛出异常。如果发射数据,就不管,当发生异常时,抛出来,也没觉得有什么特别的意义,就用 Observer 也行啊。
cast
将发射的源数据都强制转换成另一种类型。只能是父类转为子类。内部调用了 map。
由于 Kotlin 的类型推断,demo 里没想到例子来表现用它前后的区别。
map
对发射的每个事件的数据执行一个函数进行转换,将转换后的数据发给观察者。
Observable.just(1,2,3)
.map({ "number is $it" })
.subscribe(observerStr)
map 的参数是一个 Function 接口,泛型 T 是输入参数类型,R 是转换后返回的类型
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
flatMap
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize)
// 下面四个方法和上面的是一类
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int maxConcurrency)
// flatMap(ObservableInternalHelper.flatMapWithCombiner(mapper, combiner), delayErrors, maxConcurrency, bufferSize)
public final <U, R> Observable<R> flatMap(final Function<? super T, ? extends ObservableSource<? extends U>> mapper,
final BiFunction<? super T, ? super U, ? extends R> combiner, boolean delayErrors, int maxConcurrency, int bufferSize)
// 下面四个方法和上面的是一类
public final <U, R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends U>> mapper,
BiFunction<? super T, ? super U, ? extends R> resultSelector)
public final <U, R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends U>> mapper,
BiFunction<? super T, ? super U, ? extends R> combiner, boolean delayErrors)
public final <U, R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends U>> mapper,
BiFunction<? super T, ? super U, ? extends R> combiner, boolean delayErrors, int maxConcurrency)
public final <U, R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends U>> mapper,
BiFunction<? super T, ? super U, ? extends R> combiner, int maxConcurrency)
// 内部经历了 Notification 的转换,无论 onComplete,onError 都会作为一个数据通过 onNext 发射出去
public final <R> Observable<R> flatMap(
Function<? super T, ? extends ObservableSource<? extends R>> onNextMapper,
Function<? super Throwable, ? extends ObservableSource<? extends R>> onErrorMapper,
Callable<? extends ObservableSource<? extends R>> onCompleteSupplier)
public final <R> Observable<R> flatMap(
Function<? super T, ? extends ObservableSource<? extends R>> onNextMapper,
Function<Throwable, ? extends ObservableSource<? extends R>> onErrorMapper,
Callable<? extends ObservableSource<? extends R>> onCompleteSupplier,
int maxConcurrency)
val list1 = listOf(1,2,3)
val list2 = listOf(4,5)
Observable.fromArray(list1, list2)
.flatMap({
Observable.fromIterable(it)
}).subscribe(observerInt) // 收到 5 次 onNext
// 会收到
// -1-:[1, 2, 3]
// -2-:[1, 2, 3]
// -3-:[1, 2, 3]
// -4-:[4, 5]
// -5-:[4, 5]
Observable.just(list1, list2)
.flatMap( Function<List<Int>, ObservableSource<String>>{
// 将 int 转成一个字符串
Observable.fromIterable(it.map { "-$it-" })
}, BiFunction<List<Int>, String, MutableMap<String, List<Int>>> { // 这个 combiner 参数就是将源和转变后的组合返回一个新结构
t1, t2 ->
mutableMapOf(t2 to t1)
}).subscribe({
it.map { Log.e("RX", "${it.key}:${it.value}") }
})
// 会收到
// -1-
// -2-
// -3-
// -4-
// -5-
// complete onNext 收到
// onComplete
Observable.just(list1, list2)
.flatMap({ Observable.fromIterable(it.map { "-$it-" }) },
{ Observable.just( "${it.message}" ) },
{ Observable.just("complete") })
.subscribe(observerStr)
flatMapIterable
map 将一个数据转成另一种,是一对一的,flatMap 是将一个数据变成一个 Observable,内部可能发射多次,可以看成一对多,flatMapIterable 是将一个数据变成一个 Iterable,也可以认为是一对多。
Observable.just(1,10)
.flatMapIterable { listOf(it, it+1, it+2) }
.subscribe(observerInt)
依次发射 1,2,3,10,11,12。效果等价的 flatMap 写法
Observable.just(1,10)
.flatMap { Observable.fromIterable(listOf(it, it+1, it+2)) }
.subscribe(observerInt)
Observable.just(1,10)
.flatMapIterable( Function<Int, List<Int>>{ listOf(it, it+1, it+2) },
BiFunction<Int, Int, String> { t1, t2 -> "$t1:$t2"})
.subscribe(observerStr)
发射 1:1,1:2,1:3,10:10,10:11,10:12。
flatMapCompletable
val list1 = listOf(1,2,3)
val list2 = listOf(4,5)
Observable.fromArray(list1, list2)
.flatMapCompletable { t ->
CompletableSource { cs ->
Log.e("RX", "cs $t")
cs.onComplete()
}
}.subscribe(object : CompletableObserver {
override fun onComplete() {
Log.e("RX", "onComplete")
}
override fun onSubscribe(d: Disposable) {}
override fun onError(e: Throwable) {
Log.e("RX", "onError")
}
})
看源码是每次 onNext 发射一个值,先加到一个队列里保存起来。内部先拦下来,调用 flatMapCompletable 参数那个 Function 的 apply 方法获得一个 CompletableSource 对象,然后又调用它的 subscribe 方法。
必须调用 cs.onComplete()
,内部的一个布尔值 active 为 false,进入 for 循环才能继续下一次执行。
每发一次都通过一个 onComplete 标记,最终外层的 Observer 收到一个 onComplete 事件。
flatMapMaybe
将源 Observable 发射的值执行 map 操作放到 Maybe 中发射。
val list1 = listOf(1,2,3)
val list2 = listOf(4,5)
Observable.fromArray(list1, list2)
.flatMapMaybe { t ->
Maybe.create(MaybeOnSubscribe<Int> { emitter ->
t.map { it * 2 }.forEach {
emitter.onSuccess(it)
}
})
}.subscribe(object : Observer<Int> {
override fun onComplete() { textView.text = "${textView.text}\n onComplete" }
override fun onSubscribe(d: Disposable) {}
override fun onNext(t: Int) { textView.text = "${textView.text}\n $t" }
override fun onError(e: Throwable) {}
})
只收到了 2 和 8,因为 list 在 Maybe 中被展开然后发射,但由于 Maybe 是 Single 和 Completable,所以第一个列表发了 2 就结束,然后第二个列表发了 8 就结束。
如果在 Maybe 里这样:
if (list1.size == 3) emitter.onComplete()
t.map { it * 2 }.forEach {
emitter.onSuccess(it)
}
只发了一个 onComplete 就结束了,并未收到 8。
flatMapSingle
val list1 = listOf(1,2,3)
val list2 = listOf(4,5)
Observable.fromArray(list1, list2)
.flatMapSingle { t ->
Single.create(SingleOnSubscribe<Int> { emitter ->
t.map { it * 2 }.forEach {
emitter.onSuccess(it)
}
})
}.subscribe(object : Observer<Int> {
override fun onComplete() { textView.text = "${textView.text}\n onComplete" }
override fun onSubscribe(d: Disposable) {}
override fun onNext(t: Int) { textView.text = "${textView.text}\n $t" }
override fun onError(e: Throwable) {}
})
收到 2 和 8 和 onComplete。
switchMap/switchMapDelayError/switchMapSingle/switchMapSingleDelayError
和 flatMap 类似,区别是当源 Observable 发射一个新的数据项时,如果旧数据项订阅还未完成,就取消旧订阅数据和停止监视那个数据项产生的 Observable,多线程情况使用。
比如源 Observable 产生 A、B、C 三个结果,通过 switchMap 的映射规则,映射后从 ObservableA 发射 A1、A2,ObservableB 发射 B1、B2,ObservableC 发射 C1、C2。当在产生 B1 的同时,C1 已经产生了,这样就会忽略 B1 且不再监视 ObservableB。
concatMap/concatMapEager
concatMap 用法和 flatMap 一样,只是它能保证发射是有序的。
val list1 = listOf("a","b","c")
val list2 = listOf("e","f","g")
val list3 = listOf(list1, list2)
Observable.fromIterable(list3)
.concatMap({
val list4 = it.map { "--$it--" }
Observable.fromIterable(list4)
}).subscribe(observerStr)
concatMapEager 是并发处理内部 Observable,但是会引起更多的内存占用。关于 flatMap、concatMap、concatMapEager 的对比可以参考这篇文章:https://www.javacodegeeks.com/2017/08/flatmap-vs-concatmap-vs-concatmapeager-rxjava-faq.html
concatMapDelayError/concatMapEagerDelayError
concatMapDelayError 和 concatMapEagerDelayError 是推迟 error 事件,这两者最后有个 tillTheEnd 的布尔型参数,默认为 true。
先看 concatMapDelayError,关于 tillTheEnd 的注释
if true, all errors from the outer and inner ObservableSource sources are delayed until the end,
if false, an error from the main source is signalled when the current ObservableSource source terminates
但测试结果与想象的并不一样。
val list1 = listOf("a","b","c")
val list2 = listOf("e","f","g")
Observable.create(ObservableOnSubscribe<List<String>> {
it.onNext(list1)
it.onError(Throwable())
it.onNext(list2)
}).concatMapDelayError({list ->
Observable.fromIterable(list)
}, 10, true).subscribe(observerStr)
外面的 Observable 发射了 onError,无论 tillTheEnd 是 true 还是 false,observerStr 都收到了 list1 的三个字符串,然后收到一个 onError。
Observable.just(list1, list2).concatMapDelayError({list ->
Observable.create(ObservableOnSubscribe<String> { emitter ->
(0..list.count()).forEach {
if (2 == it) emitter.onError(Throwable()) else emitter.onNext(list[it])
}
})
}, 10, false).subscribe(observerStr)
外面的 Observable 正常发射,里面发了 onError,无论 tillTheEnd 为 true 还是 false,都是直接崩溃。
concatMapEagerDelayError 也是一样的现象。
concatMapIterable
Observable.just(1, 10)
.concatMapIterable { mutableListOf(it+1,it+2,it+3,it+4) }.subscribe(observerInt)
一个变多个。
concatMapCompletable
Observable.just(1,2,3,4,5)
.concatMapCompletable { t ->
CompletableSource { cs ->
Log.e("RX", "cs $t")
cs.onComplete()
}
}
.subscribe(object : CompletableObserver {
override fun onComplete() {
Log.e("RX", "onComplete")
}
override fun onSubscribe(d: Disposable) {}
override fun onError(e: Throwable) {
Log.e("RX", "onError")
}
})
和 flatMapCompletable 基本一个意思。
forEach
public final Disposable forEach(Consumer<? super T> onNext) {
return subscribe(onNext);
}
对每一个发射的数据执行一个操作,其实就是用参数去订阅。
forEachWhile
Observable.just(1,2,3)
.forEachWhile { // 如果返回 false,内部会调用 onComplete
Log.e("RX", "$it")
it < 2
}
// 加了 onError 或 onComplete 时的回调
// 到 3 时就 false,会执行一个 onComplete 所以还会收到 complete
Observable.just(1,2,3,4)
.forEachWhile ( Predicate<Int>{
Log.e("RX", "$it")
it < 3
}, Consumer<Throwable>{ Log.e("RX", "error msg = ${it.message}")},
Action { Log.e("RX", "complete")})
groupBy
将一个 Observable 分拆为一些 GroupedObservable 集合,每一个 GroupedObservable 发射原始 Observable 的一个子序列。
五个重载方法,可以分成两类,一种只有 keySelector,一种同时又 keySelector 和 valueSelector。其实只有 keySelector 的内部使用了一个默认的 valueSelector,就是原样返回传给它的 value。
哪个数据项由哪一个 GroupedObservable 发射,由 group 参数 Function 的 apply 方法决定,在 apply 里进行分组,给每项数据指定一个 Key,Key 相同的数据会被同一个 GroupedObservable 发射。
Observable.fromArray("a", "ab", "b", "c", "abc", "bc")
.groupBy(object : Function<String, Int> {
override fun apply(t: String): Int = t.length
}).subscribe(object: Consumer<GroupedObservable<Int, String>> {
override fun accept(t: GroupedObservable<Int, String>) {
if (t.key != 2) {
t.subscribe { Log.e("RX", "${t.key},$it") }
}
}
})
为了便于理解泛型的对应关系,没有完全用 Lambda 写。上面的例子中,Observable 本来要发射 6 个字符串,在 groupBy 方法中,t.length 相同的被划分为一组,所以 a,b,c 一组,ab,bc 一组,abc 一组,会有 3 个 GroupedObservable,在 subscribe 中,对 GroupedObservable 的 key 进行判断,长度为 2 的这一组不接收,Log 显示:
1,a
1,b
1,c
3,abc
Observable.fromArray("a", "ab", "b", "c", "abc", "bc")
.groupBy(
Function<String, Int> { t -> t.length },
Function<String, String> { t -> "o $t o" })
.subscribe { t ->
t.subscribe { Log.e("RX", "${t.key},$it") }
}
第二个参数就是对值做一个变换。
1,o a o
2,o ab o
1,o b o
1,o c o
3,o abc o
2,o bc o
safeSubscribe
看源码内部捕获了回调方法的异常。
val observer = object : Observer<Int> {
override fun onComplete() {}
override fun onSubscribe(d: Disposable) {}
override fun onNext(t: Int) { t/0 } // 产生异常
override fun onError(e: Throwable) {}
}
// Observable.just(1).subscribe(observer) // 崩溃
Observable.just(1).safeSubscribe(observer) // 内部捕获了
scan/scanWith
和 reduce 类似,也是一种累计运算,区别在于 reduce 计算完毕后发射一个结果,而 scan 是在累计的过程中,每次都发射。
// 作为观察者,收到 3 次事件,值分别是 1,2,6。
Observable.just(1, 2, 3)
.scan({ t1, t2 -> t1 * t2 }).subscribe {
textView.text = "${textView.text}\n $it"
}
// 有初始值,收到了 4 次事件,分别为 10,10,20,60
Observable.just(1, 2, 3)
.scan(10, { t1, t2 -> t1 * t2 }).subscribe {
textView.text = "${textView.text}\n $it"
}
// 其实上一种内部也是调用的这一种
Observable.just(1, 2, 3)
.scanWith({ 10 }, { t1, t2 -> t1 * t2 }).subscribe {
textView.text = "${textView.text}\n $it"
}
sorted
排序后发射。源 Observable 发射的数据必须是实现了 Comparable 的,即是可以比较的。
// 默认按自然顺序排序,收到 1,3,5,9
Observable.just(5,3,9,1).sorted().subscribe(observerInt)
// 使用自定义的排序算法,逆序,收到 9,5,3,1
Observable.just(5,3,9,1).sorted { o1, o2 -> o2-o1 }.subscribe(observerInt)
to
通过 Function 将一个 Observable 变成另一个。
// onNext 2
// onNext 4
// onComplete
Observable.just(1,2).to { it.map {it * 2} }.subscribe(observerInt)
toList
public final Single<List<T>> toList() {
return toList(16);
}
// 第二个参数指定列表的初始长度
public final Single<List<T>> toList(final int capacityHint)
// 通过参数提供一个列表
public final <U extends Collection<? super T>> Single<U> toList(Callable<U> collectionSupplier) {
把发射的数据组合成列表在一起发射。
// 发射一次,内容是一个列表 [2,4,1,3,5]
Observable.just(2,4,1,3,5).toList()
.subscribe(Consumer<List<Int>> {
textView.text = "${textView.text}\n ${it.toList()}"
})
// 参数提供一个列表,里面已经有一个值 10
// 收到的是 [10,2,4,1,3,5
Observable.just(2,4,1,3,5)
.toList{ mutableListOf(10)}
.subscribe(Consumer<List<Int>> {
textView.text = "${textView.text}\n ${it.toList()}"
})
toSortedList
public final Single<List<T>> toSortedList()
public final Single<List<T>> toSortedList(final Comparator<? super T> comparator)
public final Single<List<T>> toSortedList(final Comparator<? super T> comparator, int capacityHint)
public final Single<List<T>> toSortedList(int capacityHint)
相比 toList,发射的列表是排好序的。
// 只发射一次 onNext([1,2,3,4,5])
Observable.just(2,4,1,3,5).toSortedList()
.subscribe(Consumer<List<Int>> {
textView.text = "${textView.text}\n ${it.toList()}"
})
// 只发射一次 onNext([5,4,3,2,1])
Observable.just(2,4,1,3,5).toSortedList { t1, t2 -> t2-t1}
.subscribe(Consumer<List<Int>> {
textView.text = "${textView.text}\n ${it.toList()}"
})
toMap
// 用 keySelector 提供的数据作为 key,源 Observable 发射的数据作为 value,封装成 Map 发射
public final <K> Single<Map<K, T>> toMap(final Function<? super T, ? extends K> keySelector)
// 分别使用 keySelector 和 valueSelector 作为 key 和 value
public final <K, V> Single<Map<K, V>> toMap(final Function<? super T, ? extends K> keySelector, final Function<? super T, ? extends V> valueSelector)
// 最后一个参数提供 Map,默认的是 HashMap,逆序
public final <K, V> Single<Map<K, V>> toMap(final Function<? super T, ? extends K> keySelector, final Function<? super T, ? extends V> valueSelector, Callable<? extends Map<K, V>> mapSupplier)
val ob = Observable.just(2,2,4,4,1)
val consumer = Consumer<Map<String, Int>> {
for (map in it) {
textView.text = "${textView.text}\n key:${map.key} value:${map.value}"
}
}
// key 相同,value 会覆盖,逆序输出
// key:-4- value:4
// key:-2- value:2
// key:-1- value:1
ob.toMap({ "-$it-" }).subscribe(consumer)
// key:-4- value:28
// key:-2- value:0
// key:-1- value:7
ob.toMap({ "-$it-" }, {it*Random().nextInt(10)}).subscribe(consumer)
// 使用自己传进去的 Map,里面有了一对数据,内部是 LinkedMap
// key:a value:100
// key:-2- value:8
// key:-4- value:32
// key:-1- value:6
ob.toMap({ "-$it-" }, {it*2}, { mutableMapOf("a" to 100) }).subscribe(consumer)
toMultimap
和 toMap 类似,只是这里相同 key 的 value 不会覆盖,而是将多个数据组成一个 List 作为 value。构造方法参数和 toMap 类似,只是多一个重载方法,最后一个参数提供用作 value 的那个 List。
public final <K, V> Single<Map<K, Collection<V>>> toMultimap(
final Function<? super T, ? extends K> keySelector,
final Function<? super T, ? extends V> valueSelector,
final Callable<? extends Map<K, Collection<V>>> mapSupplier,
final Function<? super K, ? extends Collection<? super V>> collectionFactory)
val ob = Observable.just(2,2,4,4,1)
val consumer = Consumer<Map<String, Collection<Int>>> {
for (map in it) {
textView.text = "${textView.text}\n key:${map.key} value:${map.value}"
}
}
// key 相同的 value 会进入一个列表
// key:-4- value:[4,4]
// key:-2- value:[2,2]
// key:-1- value:[1]
ob.toMultimap { "-$it-" }.subscribe(consumer)
// key:-4- value:[4,16]
// key:-2- value:[0,18]
// key:-1- value:[2]
ob.toMultimap({ "-$it-" }, {it*Random().nextInt(10)}).subscribe(consumer)
toFlowable
public final Flowable<T> toFlowable(BackpressureStrategy strategy)
参数指定背压策略。
toFuture
如果 Observable 没有发射值,抛出 NoSuchElementException 异常,只有发射一个值时,可以通过 Future 来获取,如果发射两个以上的值,抛出 IllegalArgumentException,注释是这么说的,可是写代码测试 toFuture 并没有异常,只是这个 Future 里面没数据,get 方法时会产出 IndexOutOfBoundsException。
try {
// val i = Observable.empty<Int>().toFuture().get()
// val i = Observable.just(1).toFuture().get()
val i = Observable.just(1, 2).toFuture().get()
textView.text = "${textView.text}\n future $i"
} catch (e: Exception) {
textView.text = "${textView.text}\n ${e.message}"
}