Flowable interval源码解析
2019-11-16 本文已影响0人
lonlonlon
使用interval的时候,对interval的原理不是很了解,比如在订阅的函数里取消订阅,是否能马上生效,所以看了一下interval的实现
代码
下面是本次分析的例子
class TestInterval {
var disposable: Disposable? = null
fun test() {
disposable = Flowable.interval(0, 500, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ t ->
disposable?.dispose()
})
}
}
订阅过程
rxjava的订阅是从下向上看的,所以调用依次是FlowableObserveOn、FlowableInterval,详细过程如下图所示

数据发送流程
FlowableInterval会周期性发射数据,经过ObserveOnSubscriber进行线程切换,最后经过LambdaSubscriber把数据传递给业务传入的Consumer。详细流程如下所示

dispose过程
调用dispose方法后,cancel事件会层层传递,传递到IntervalSubscriber,这里会把定期任务取消掉

IntervalSubscriber
final AtomicReference<Disposable> resource = new AtomicReference<Disposable>();
@Override
public void cancel() {
DisposableHelper.dispose(resource);
}
FlowableInterval
@Override
public void subscribeActual(Subscriber<? super Long> s) {
IntervalSubscriber is = new IntervalSubscriber(s);
s.onSubscribe(is);
Scheduler sch = scheduler;
if (sch instanceof TrampolineScheduler) {
Worker worker = sch.createWorker();
is.setResource(worker);
worker.schedulePeriodically(is, initialDelay, period, unit);
} else {
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
is.setResource(d);
}
}
问题分析
有了这个了解之后,回到最初的问题,dispose方法调用后,是否会存在线程同步问题,再执行一次Consumer方法呢?
LambdaSubscriber
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().cancel();
onError(e);
}
}
}
@Override
public boolean isDisposed() {
return get() == SubscriptionHelper.CANCELLED;
}
@Override
public void dispose() {
cancel();
}
@Override
public void cancel() {
SubscriptionHelper.cancel(this);
}
SubscriptionHelper
public static boolean cancel(AtomicReference<Subscription> field) {
Subscription current = field.get();
if (current != CANCELLED) {
current = field.getAndSet(CANCELLED);
if (current != CANCELLED) {
if (current != null) {
current.cancel();
}
return true;
}
}
return false;
}
能看到调用dispose后,就是把LambdaSubscriber里的Subscription设置为SubscriptionHelper.CANCELLED常量。下次onNext的时候判断如果LambdaSubscriber里的Subscription是CANCELLED,就不会再传递数据了,由于调用LambdaSubscriber onNext是同一个线程,所以不会存在多线程问题。