Rxjava中的线程调度分析

2019-03-16  本文已影响0人  欧文Kira

使用Rxjava的一个开源小项目

吃个栗子

拿下面这个栗子说事:

        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Android");
                emitter.onNext("ios");
                emitter.onNext("Other");
                emitter.onComplete();
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: "+s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

subscribeOn

subscribeOn()源码如下:

    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

ObservableSubscribeOn 类是一个装饰者,对下游的 Observer 进行装饰,很明显这里主要增加了一个 Scheduler 来做线程切换。
ObservableSubscribeOn 已经不再是从前的那个下游的Observer,Scheduler 的能力让它可以牛逼轰轰的了。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    // 保存线程调度器,该栗子中即Schedulers.io(),Schedulers.io()生成的一个线程调度对象,此对象是维护这一个线程池,让操作在io线程池中执行
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
   // 保存 ObservableSource,该栗子中即 observableCreate 类
        super(source);
        this.scheduler = scheduler;
    }
    // 订阅时调用
    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        // 对 Observer 进行包装
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
         // 调用 下游(终点)Observer.onSubscribe()方法,所以onSubscribe()方法执行在订阅处所在的线程
        observer.onSubscribe(parent);
        // setDisposable()是为了将子线程的操作加入 Disposable 管理中
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // 此时已经运行 io 线程中
            source.subscribe(parent);
        }
    }

上面的代码中,scheduler.scheduleDirect 将 SubscribeTask 加入到线程池中(此处为IO线程)执行。所以subscribeOn对线程切换是在
Observer 对 ObservableSubscribeOn 类进行 subscribe 时发生的。即,当一个 Observer 对 ObservableSubscribeOn 进行订阅时,后面
所有的操作都会在我们所指定线程中(此处为io)执行。

再来看看 SubscribeOnObserver 如何包装 observer:

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        // 真正的下游(终点)观察者
        final Observer<? super T> downstream;
        // 用于保存上游的Disposable,以便在自身 dispose 时,连同上游一起 dispose
        final AtomicReference<Disposable> upstream;

        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }
        // onSubscribe()方法由上游调用,传入 Disposable,在本类中赋值给 this.upstream,加入管理。
        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }
        // 直接调用下游观察者的对应方法
        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            downstream.onComplete();
        }
        // 取消订阅时,连同上游 Disposable 一起取消
        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
        // 这个方法在subscribeActual()中被手动调用,为了将Schedulers返回的Worker加入管理
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

SubscribeOnObserver 对象会持有一个observer对象,同时也会维护一个 Disposable 对象,用于保存上游的Disposable,
以便在自身 dispose 时,连同上游一起 dispose。这里可以看到 SubscribeOnObserver 其实是 observer 对象的一个装饰者。

我们总结一下subscribeOn(Schedulers.xxx())的过程:

为什么 subscribeOn(Schedulers.xxx())切换线程N次,总是以第一次为准,或者说离源 Observable 最近的那次为准?

线程调度 observeOn

增加一个 observeOn (AndroidSchedulers.mainThread()),就完成了观察者线程的切换。

                .subscribeOn(Schedulers.newThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) {
                        return s+s;
                    }
                })
                .observeOn(Schedulers.io())
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {\
   //本例是 AndroidSchedulers.mainThread()
    final Scheduler scheduler;
    //默认false
    final boolean delayError;
    //默认128
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            // 创建出一个主线程的 Worker
            Scheduler.Worker w = scheduler.createWorker();
           // 订阅上游数据源,
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

subscribeActual 主要做了两件事:

 static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        // 下游的观察者
        final Observer<? super T> downstream;
        // 对应Scheduler里的Worker
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;
        // 上游被观察者 push 过来的数据都存在这里
        SimpleQueue<T> queue;

        Disposable upstream;
        // 如果onError了,保存对应的异常
        Throwable error;
        //是否完成
        volatile boolean done;
        //是否取消
        volatile boolean disposed;
        // 代表同步发送 异步发送 
        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
               //创建一个queue 用于保存上游 onNext() push的数据
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
               //回调下游观察者onSubscribe方法
                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            // 执行过error / complete 会是true
            if (done) {
                return;
            }
            // 如果数据源类型不是异步的, 默认不是
            if (sourceMode != QueueDisposable.ASYNC) {
           // 将上游push过来的数据 加入 queue里
                queue.offer(t);
            }
          // 开始进入对应 Workder 线程,在线程里将 queue里的 t 取出发送给下游Observer
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            //开始调度
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            //开始调度
            schedule();
        }

        @Override
        public void dispose() {
            if (!disposed) {
                disposed = true;
                upstream.dispose();
                worker.dispose();
                if (getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                //该方法需要传入一个线程, 注意看本类实现了Runnable的接口,所以查看对应的run()方法
                worker.schedule(this);
            }
        }

         //从这里开始,这个方法已经是在Workder对应的线程里执行的了
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                // 取出 queue 里的数据发送
                drainNormal();
            }
        }
    }

上面代码主要功能如下:

  1. 将上游push过来的数据加入 ObserveOnObserver 对象的一个队列中(SpscLinkedArrayQueue)

  2. ObserveOnObserver 的Scheduler.Worker worker 变量保存着我们所指定的线程,此处为 AndroidSchedulers.mainThread()。

  3. 然后 worker 将 ObserveOnObserver 对象放入指定的线程中(此处为 AndroidSchedulers.mainThread()),去取出其队列中
    存储的来自上游的事件。

还需要关注几点:

关于多次调用生效问题。对比 subscribeOn() 切换线程是在 subscribeActual()里做的,只是主动切换了上游的订阅线程,从而影响其发射数据时所在的线程。
而直到真正发射数据之前,任何改变线程的行为,都会生效(影响发射数据的线程)。所以 subscribeOn() 只生效一次。observeOn()是一个主动的行为,并且切换线程后会立刻发送数据,所以会生效多次。

小结

线程调度subscribeOn():

线程调度 observeOn():

上一篇 下一篇

猜你喜欢

热点阅读