RxJavaAndroid-Rxjava&retrofit&daggerAndroid-RxJava

Flowable interval源码解析

2019-11-16  本文已影响0人  lonlonlon

使用interval的时候,对interval的原理不是很了解,比如在订阅的函数里取消订阅,是否能马上生效,所以看了一下interval的实现

代码

下面是本次分析的例子

 class TestInterval {
    var disposable: Disposable? = null
    fun test() {
        disposable = Flowable.interval(0, 500, TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ t ->
                    disposable?.dispose()
                })
    }
}

订阅过程

rxjava的订阅是从下向上看的,所以调用依次是FlowableObserveOn、FlowableInterval,详细过程如下图所示


image.png

数据发送流程

FlowableInterval会周期性发射数据,经过ObserveOnSubscriber进行线程切换,最后经过LambdaSubscriber把数据传递给业务传入的Consumer。详细流程如下所示


image.png

dispose过程

调用dispose方法后,cancel事件会层层传递,传递到IntervalSubscriber,这里会把定期任务取消掉


image.png

IntervalSubscriber

final AtomicReference<Disposable> resource = new AtomicReference<Disposable>();
  @Override
        public void cancel() {
            DisposableHelper.dispose(resource);
        }

FlowableInterval

   @Override
    public void subscribeActual(Subscriber<? super Long> s) {
        IntervalSubscriber is = new IntervalSubscriber(s);
        s.onSubscribe(is);

        Scheduler sch = scheduler;

        if (sch instanceof TrampolineScheduler) {
            Worker worker = sch.createWorker();
            is.setResource(worker);
            worker.schedulePeriodically(is, initialDelay, period, unit);
        } else {
            Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
            is.setResource(d);
        }
    }

问题分析

有了这个了解之后,回到最初的问题,dispose方法调用后,是否会存在线程同步问题,再执行一次Consumer方法呢?

LambdaSubscriber

  @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            try {
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                get().cancel();
                onError(e);
            }
        }
    }

  @Override
    public boolean isDisposed() {
        return get() == SubscriptionHelper.CANCELLED;
    }

 @Override
    public void dispose() {
        cancel();
    }
 @Override
    public void cancel() {
        SubscriptionHelper.cancel(this);
    }

SubscriptionHelper

  public static boolean cancel(AtomicReference<Subscription> field) {
        Subscription current = field.get();
        if (current != CANCELLED) {
            current = field.getAndSet(CANCELLED);
            if (current != CANCELLED) {
                if (current != null) {
                    current.cancel();
                }
                return true;
            }
        }
        return false;
    }

能看到调用dispose后,就是把LambdaSubscriber里的Subscription设置为SubscriptionHelper.CANCELLED常量。下次onNext的时候判断如果LambdaSubscriber里的Subscription是CANCELLED,就不会再传递数据了,由于调用LambdaSubscriber onNext是同一个线程,所以不会存在多线程问题。

上一篇 下一篇

猜你喜欢

热点阅读