Rxjava的线程调度源码解析

2019-03-26  本文已影响0人  nmssdmf

代码调用

        Observable.just(1)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                    }
                });

直接进入主题,先看subscribe中调用了哪些方法

    //Observable.java
    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
    //最终调用了Observable的subscribeActual方法
    protected abstract void subscribeActual(Observer<? super T> observer);

接下来我们看下subscribeOn方法中进行了什么操作

    //Observable.java
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //这里返回了一个ObservableSubscribeOn对象,参考RxJavaPlugins.onAssembly方法,
        //返回的值就是传入的值,再根据流式调用,
        //即上面分析调用的subscribeActual方法,即是ObservableSubscribeOn的subscribeActual方法
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

接下来我们看ObservableSubscribeOn的subscribeActual方法

//ObservableSubscribeOn.java
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);
        //这里生成了一个SubscribeTask,查看源码可知实现了Runnable接口
        //这里调用了scheduler.scheduleDirect
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

看下scheduler.scheduleDirect,再次之前,我们先看下传入的Scheduler.io
查看传入的Schedule

    public static Scheduler io() {
        //  这里查看下IO
        return RxJavaPlugins.onIoScheduler(IO);
    }
 //new IOTask
IO = RxJavaPlugins.initIoScheduler(new IOTask());

    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    //由此可见,最后Schedulers.io就是IoScheduler
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    //scheduler
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //这里生成一个Worker,但是createWorker是一个虚方法,有上可知
        //这里调用了IoScheduler.createWorker,生成EventLoopWorker对象
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        
        DisposeTask task = new DisposeTask(decoratedRun, w);
        //调用了EventLoopWorker.schedule
        w.schedule(task, delay, unit);

        return task;
    }

接下来看EventLoopWorker

        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            //取消注册
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            //NewThreadWorker.scheduleActual
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

真正进入线程调度的代码,在NewThreadWorker中

    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                //executor是一个线程池
                f = executor.submit((Callable<Object>)sr);
            } else {
                //存在延迟的
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

所以到最后,真正进行线程调度的,其实是一个线程池,看完了subscribeOn,我们再来看看observeOn,首先看下AndroidSchedulers.mainThread()到底是哪个线程

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    private static final class MainHolder {
        static final Scheduler DEFAULT
            //从Looper.getMainLooper()可以看出,这里是获取了主线程的Looper
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
    }

好确定了这个问题,我们再继续往下看

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        //生成一个新的ObservableObserverOn对象
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

接下去看ObservableObserveOn对象

    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //跟之前一样还是调用createWorker,从上面代码可知调用了HandlerScheduler.createWorker返回HandlerWorker
            Scheduler.Worker w = scheduler.createWorker();
            //这里有一个内部类对象ObserveOnObserver
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

//内部类ObserveOnObserver,以下是回调方法

        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) d;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        downstream.onSubscribe(this);
                        //调用schedule
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        downstream.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);

                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            //调用schedule
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            //调用schedule
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
             //调用schedule
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                //所以当回调的时候,最终是调用了worker.schedule
                worker.schedule(this);
            }
        }
//最终看一下HandlerWorker的schedule方法,一看源码,老朋友了,Handler就不解释了
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            if (async) {
                message.setAsynchronous(true);
            }

            handler.sendMessageDelayed(message, unit.toMillis(delay));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }
上一篇下一篇

猜你喜欢

热点阅读