Android知识

RxJava2 线程切换原理

2018-06-04  本文已影响13人  董江鹏

在移动端编程的时候基本上都会需要频繁地切换线程,因为复杂的耗时任务需要放到后台线程里去运行,UI绘制工作又只能在主线程里执行。Android 里一般用Handler实现,iOS 里通过系统提供的任务队列实现,但两者均不够优雅,无可避免地需要将代码包裹起来,Android 里需要用Runnable传递,iOS里则是闭包。如果遇到多次切换线程的情况,代码的缩进层级就会变深,可读性变差。

还好,ReactiveX 库优雅地解决了这个问题,而且还是跨平台的,RxJavaRxSwift 让移动端的线程切换工作变得优雅起来。

这篇文章主要试图说清楚RxJava2线程切换的实现过程。

操作符

RxJava2 的线程操作符是 subscribeOnobserveOn ,我们先按调用顺序一步步看。以下源码均有简化。

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

subscribeOn 操作符将之前产生的 Observable 和 传入的 Scheduler封装成 ObservableSubscribeOn

   public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
       return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
   }

observeOn 操作符将之前产生的 Observable 和 传入的 Scheduler封装成 ObservableObserveOn

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
        subscribe(ls);
        return ls;
    }

    public final void subscribe(Observer<? super T> observer) {  
        subscribeActual(observer);    
    }
最后通过`subscribe`操作符设置观察者(`Observer`)来触发整个流程。

流程

subscribe 会调用上一个Observable(即ObservableObserveOn)的 subscribeActual() 方法。

    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

此处将observeOn操作符后面的观察者(包括后面所有的操作符和Observer)和其将要运行的线程相关内容(Worker)封装成ObserveOnObserver 。此处的sourceobserveOn操作符之前的内容(切换之前产生的Observable)。

由此可见,每一次observeOn的使用,都会将后面的观察者(同上)和将要切换的线程内容封装起来。

我们可以看看ObserveOnObserver的执行过程。

      public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);

                actual.onSubscribe(this);
            }
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

        ......

ObserveOnObserver里的数据处理都将在指定的Scheduler里执行,所以到这里可以证明,每次observeOn都会切换后面内容的执行线程。

再继续往上调用,回到前面的内容

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

假设该source是由subscribeOn产生的, 此处将会调用ObservableSubscribeOn里的subscribeActual() 方法。

    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

上一个操作产生的source(即Observable)的subscribe操作将在指定的Scheduler里执行, 每一次subscribeOn的调用都会在之前的Observable上封装一层,但数据的发射由最里层的Observable实现,即在第一个ObservableSubscribeOn封装里执行。

我们以最简单的 ObservableJust 举例,subscribeActual() 必然在第一个subscribeOn里的Scheduler里执行。

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}

......

      public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }

小结

文章从源码层面解释了RxJava2线程切换的原理,以及 subscribeOnobserveOn 两个操作符生效的场景。
整个流程还是很复杂,我先是读了几遍源码,但是在记忆里找起来总是很混乱,便写了个简单的demo,用断点调试工具一步步地走,才慢慢豁然开朗。如果看不明白,建议使用断点调试工具。

上一篇下一篇

猜你喜欢

热点阅读