Android Developers

Rxjava订阅和取消流程分析

2020-04-07  本文已影响0人  A_si

事件序列

简单使用:

Observable.just(1).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Observable:

Observable<Integer> just = Observable.just(1); 是一个被观察者,看看just做了什么:

  public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

onAssembly参数new了一个ObservableJust,函数体只是创建一个钩子函数。查下ObservableJust

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}

把发射的数据保存为value,然后重写subscribeActual,记住这个函数,继续查看订阅发生了什么:

订阅:

  public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

一些判空,observer = RxJavaPlugins.onSubscribe(this, observer);也是创建钩子,然后调用了subscribeActual,这个才是订阅发生的真实事件,返回查看这个方法,一共有三部:

  1. new了ScalarDisposable对象,
  2. 调用observer.onSubscribe(sd)
  3. 调用第一步new出来的对象的run方法,也就是observer.onNext(value);,如果发射完成还会调用observer.onComplete();

这是一个简单的订阅流程。上游发射数据,订阅的时候,observe调用自己的方法。

数据转换

上面流程加了map操作:

        Observable.just(1)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return String.valueOf(integer);
                    }
                })

查看map的实现:

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

和just几乎一样,只不过onAssembly的参数是ObservableMapObservableMap和上面的ObservableJust结构一样,只不过这里保存的不再是value,而是一个Function,我们需要关注的也是这个方法:

  @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

订阅的时候会调用这个方法,这里new了一个MapObserver,并和上游完成订阅,在这里订阅的时候,依然会调用MapObserver的onNext方法。而onNext方法等于一个代理,调用了下游订阅的Observe,也就是代码里写的那个Observe

 @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

整个流程就是,把原来的Observable转化为ObservableMap,然后在订阅的时候,为ObservableMap添加一个订阅者MapObserver,接受上游的数据,在这个订阅者里面转化数据并回调给原来的Observe

disposed取消订阅

取消分为以下三种情况:

  1. 无延迟无后续,single.just之类,只发射一个数据;
  2. 有延迟无后续,delay之类,延迟发射一次数据;
  3. 有延迟有后续,interval之类,轮询发射数据;

无延迟无后续

        Single.just(1)
                .subscribe(new SingleObserver<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onSuccess(Integer integer) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }
                });

这个订阅事件只发射一次,并且没有延迟,上面我们看过subscribeActual,知道onSubscribe的参数是哪一个,下面查看代码,看看取消做了什么:

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }

点进去查看disposed(),返回的是EmptyDisposable.INSTANCE,查看它的取消:

   @Override
    public void dispose() {
        // no-op
    }

什么都没做。因为整个事件只有一个数据,订阅就发射了,并没有后续事件。所以事件序列已经完成,不需要我们手动取消了。

有延迟不后续

        Single.just(1)
                .delay(1,TimeUnit.SECONDS)
                .subscribe(new SingleObserver<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        
                    }

                    @Override
                    public void onSuccess(Integer integer) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }
                });

根据前面map的代码分析,return delay(time, unit, Schedulers.computation(), false);可是看到是自动切了线程,然后我们找到SingleDelay,查看代码:


    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {

        final SequentialDisposable sd = new SequentialDisposable();
        observer.onSubscribe(sd);
        source.subscribe(new Delay(sd, observer));
    }

SingleDelay被下面的内部类Delay订阅,这个SequentialDisposable就是实际的订阅时候的Disposable。它其实是个引用:


        @Override
        public void onSubscribe(Disposable d) {
            sd.replace(d);
        }

        @Override
        public void onSuccess(final T value) {
            sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
        }

        @Override
        public void onError(final Throwable e) {
            sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
        }

在订阅的时候,replace的是上游的Disposable,因为这个方法是上游调用并赋值的,然后,在onSuccessonError的时候,又一次replace,也就是说,在订阅发生,数据还在延迟的时候,是取消的上游,在收到后,是取消的下游。

有延迟有后续

Observable.interval(1, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        
                    }

                    @Override
                    public void onNext(Long aLong) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

根据前面delay的解析,先进入interval函数查看,这里看到会自动切线程,然后一个新建的ObservableInterval,进入查看:

    @Override
    public void subscribeActual(Observer<? super Long> observer) {
        IntervalObserver is = new IntervalObserver(observer);
        observer.onSubscribe(is);

        Scheduler sch = scheduler;

        if (sch instanceof TrampolineScheduler) {
            Worker worker = sch.createWorker();
            is.setResource(worker);
            worker.schedulePeriodically(is, initialDelay, period, unit);
        } else {
            Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
            is.setResource(d);
        }
    }

第二行的observer就是下游写的订阅者,onSubscribe传递的参数就是第一行new的IntervalObserver,那么它一定是个Disposable,果不其然:

 static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {

但是看到这里,又疑惑了,这里类似线程的原子操作:

    @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

其实IntervalObserver继承了AtomicReference<Disposable>,又实现了Disposable,也就说,它可以用AtomicReference代理别的Disposable,也可以用自己的Disposable。它既可以歌手,也可以是个经纪人。
回到上面继续看subscribeActual

            Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
            is.setResource(d);

第一行有一个d,然后setResource,这个d就是IntervalObserver实际代理的Disposable。第一行的代码执行了切换线程,和向下游发射数据,所以取消,也就是取消了上游的发射数据,让上游停止发射数据。再看它的另一个方法:

    @Override
        public void run() {
            if (get() != DisposableHelper.DISPOSED) {
                downstream.onNext(count++);
            }
        }

downstream就是传是入的下游订阅者,在这里判断,如果DISPOSED,就不调用downstream.onNext(count++);,也就是下游接收不到了。

既取消了上游的发射,也取消了下游的接收。

上一篇 下一篇

猜你喜欢

热点阅读