RxJava 连接操作符
cache/cacheWithInitialCapacity
看注释意思是将所有数据按原来的顺序缓存起来,就是不知道观察者什么时候订阅,什么时候解除订阅,所以缓存起来,以后直接用。
val list = arrayListOf(1,2,3)
val ob = Observable.fromIterable(list)
// .cache()
list.clear()
ob.subscribe(observerInt)
假设向上面这般用法,无论有没有 cache,Observer 收到的都是只有一个 onComplete。
val list = arrayListOf(1,2,3)
val ob = Observable.fromIterable(list)
.cache()
ob.subscribe(observerInt)
list.clear()
ob.subscribe(observerInt)
现在如果没有 cache,第一个会收到三次 onNext 和一次 onComplete,clear 之后由于数据清空,只会收到 onComplete。而有了 cache,两个订阅得到的结果都是三次 onNext 和一次 onComplete。
这说明是在有了一个观察者订阅之后,会把被观察者发射的数据缓存起来,这适合多个观察者存在时,其它还没有立刻订阅的观察者也能通过缓存拿到最初的数据。
cacheWithInitialCapacity 的参数表示内部用的缓冲区大小,对外界使用没区别,cache 方法用的是 16.
publish
将普通的 Observable 变成可连接的 ConnectableObservable,它不会在被订阅时发射数据,而是直到使用了connect 操作符时才开始。用这种方法,可以控制在任何时候让 Observable 开始发射数据。
public final ConnectableObservable<T> publish()
// 用 Function 转换源 Observable 发射的数据
public final <R> Observable<R> publish(Function<? super Observable<T>, ? extends ObservableSource<R>> selector)
val ob = Observable.just(1,2,3)
.doOnSubscribe { Log.e("RX", "onSubscribe")}
.publish()
Log.e("RX", "subscribe")
ob.subscribe(observerInt)
Thread.sleep(2000)
ob.connect()
2 秒后执行 connect 在 onSubscribe。
replay
ConnectableObservable 和普通的 Observable 最大的区别就是,调用 connect 操作符开始发射数据,后面的订阅者会丢失之前发射过的数据。
var ob = Observable.interval(1, 100, TimeUnit.MILLISECONDS).take(6)
ob = ob.publish()
ob.subscribe{ Log.e("RX", "observer 1 onNext $it") }
ob.connect()
Thread.sleep(400)
ob.subscribe{ Log.e("RX", "observer 2 onNext $it") }
ob.connect()
日志:
observer 1 onNext 0
observer 1 onNext 1
observer 1 onNext 2
observer 1 onNext 3
observer 1 onNext 4
observer 1 onNext 5
observer 2 onNext 5
可见 observer2 丢了 0-4,使用 replay 返回的 ConnectableObservable 会缓存订阅者订阅之前已经发射的数据,可以指定缓存的大小或者时间,这样能避免耗费太多内存。
public final ConnectableObservable<T> replay()
public final ConnectableObservable<T> replay(final int bufferSize)
public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit)
public final ConnectableObservable<T> replay(final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler)
public final ConnectableObservable<T> replay(final int bufferSize, final Scheduler scheduler)
public final ConnectableObservable<T> replay(long time, TimeUnit unit)
public final ConnectableObservable<T> replay(final long time, final TimeUnit unit, final Scheduler scheduler)
public final ConnectableObservable<T> replay(final Scheduler scheduler)
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector)
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize)
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, int bufferSize, long time, TimeUnit unit)
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler)
public final <R> Observable<R> replay(final Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize, final Scheduler scheduler)
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, long time, TimeUnit unit)
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final long time, final TimeUnit unit, final Scheduler scheduler)
public final <R> Observable<R> replay(final Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final Scheduler scheduler)
重载方法很多,但大致可分为两类,一类返回 ConnectableObservable,一类有参数 selector,可以变换源 Observable 发射的数据,然后将这些数据放到一个 Observable 中,方法返回 Observable。
缓存 2 个数
ob = ob.replay(2)
日志
observer 1 onNext 0
observer 1 onNext 1
observer 1 onNext 2
observer 1 onNext 3
observer 1 onNext 4
observer 2 onNext 3
observer 2 onNext 4
observer 1 onNext 5
observer 2 onNext 5
可见 observer2 还取到了被缓存的 3 和 4。
缓存 300ms 内的数据
ob = ob.replay(300, TimeUnit.MILLISECONDS)
日志:
observer 1 onNext 0
observer 1 onNext 1
observer 1 onNext 2
observer 1 onNext 3
observer 1 onNext 4
observer 2 onNext 2
observer 2 onNext 3
observer 2 onNext 4
observer 1 onNext 5
observer 2 onNext 5
收到了前 300ms 缓存的 3,4,5。
其中第二类看源码内部也调用了第一种的 replay,subscribe 时内部会自动执行 connect。
val ob2 = ob.replay({
it.map { it*10 }
}, 2)
ob2.subscribe{ Log.e("RX", "observer 1 onNext $it") }
Thread.sleep(400)
ob2.subscribe{ Log.e("RX", "----------------observer 2 onNext $it") }
但两个观察者都收到了所有数据,和想象不同。
它不像第一类,它是每次 subscribe 时内部都对普通的 Observable 执行第一类的 replay,再往内部走是 new 了一个 ConnectableObservable。所以两次 subscribe 内部用的是两个 ConnectableObservable 对象。
暂不清楚它的应用场景在哪里。
share
public final Observable<T> share() {
return publish().refCount();
}
refCount() 把 ConnectableObservable 变为一个普通的 Observable 但又保持了 ConnectableObservable 的特性。如果出现第一个 Observer,它就会自动调用 connect(),如果所有的 Observer 全部 dispose,那么它也会停止接受上游 Observable 的数据。