RxJava 源码分析系列(五) -线程调度原理

2018-09-09  本文已影响0人  琼珶和予

  对于RxJava来说,简洁的线程切换操作是它优秀的地方之一。所以了解它的线程调度原理是完全有必要,这个既能帮助我们理解其中的奥妙,同时如果自己在开发当中需要做类似的需求,可以作为一个参考。同时,RxJava的线程调度原理也是RxJava当中比较难以理解的一部分。

1.概述

  说线程调度原理比较难以理解,其实我觉得就是涉及到的类比较多,然后他们之间的关系又难以梳理清楚,所以显得这部分的代码高深莫测。在这里,我先对线程调度涉及到的比较重要的类做一个小小的说明。

类名 含义
Schedulers 此类里面提供了我们需要使用的调度器对象
Scheduler 线程调度器,Observer或者Observable通过此类进行线程调度,每一个线程对应一个调度器
Worker 真正执行任务的类,每个调度器都需要获取一个Worker对象来执行任务
Runnable 在整个调度的过程中,一个任务会包装成一个Runnable来执行,包括线程池和Handler

  整个线程调度的核心在SchedulerWorker类中。接下来,我们重点分析这两个类,看这两个类怎么担任线程调度的重任的。

2. Scheduler的原理分析

  Scheduler是一个抽象类,我们来看看它的源码:

public abstract class Scheduler {
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

    public abstract Worker createWorker();
}

  createWorker方法是一个非常重要的方法,后续的Scheduler都必须在这个方法里面返回一个Worker。所以每一个Scheduler都对应着一个Worker
  其次scheduleDirect方法也是非常重要,当一个观察者被subscribe时,会调用到此方法来。当然这时候可能还不清楚是怎么调用到这里来的,我们也不用太害怕,这里我们只需要知道会调用这个方法来的就行,后续我会详细的追踪源码来解释。
  如果此时的Scheduler是一个子线程的Scheduler,例如接下来要讲的NewThreadScheduler,那么此时提交到Worker去执行,肯定在NewThreadScheduler所在线程执行。同理,如果是Android主线程的Scheduler,肯定就在主线程里面执行了。
  我们发现在这个方法里面,先是调用了createWorker创建一个Worker对象,然后后续经过一系列的包装,将任务包装成为一个Runnable,然后调用Workerschedule方法来执行任务。
  接下来,我们看几个Scheduler的子类。

(1).NewThreadScheduler

public final class NewThreadScheduler extends Scheduler {
    final ThreadFactory threadFactory;
    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}

  我简略了一下NewThreadScheduler的代码,让它看起来更加的清晰。整个过程,我们发现在createWorker方法里面返回了一个 NewThreadWorker对象,其他感觉也没什么东西,是不是感觉咱们分析错了。当然不是,核心在Worker里面,当然此时不是分析Worker的时机。我们只需要记住整个NewThreadScheduler工作流程。

(2).HandlerScheduler

  我们再来看看HandlerScheduler的源码:

final class HandlerScheduler extends Scheduler {
    private final Handler handler;
    private final boolean async;

    HandlerScheduler(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }

    @Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, unit.toMillis(delay));
        return scheduled;
    }
    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }
}

  在Android当中,如果想要在主线程中工作,Handler是必不可少的。所以,我们在HandlerScheduler中看到Handler也是理所当然的。
  不过,这里我们发现,HandlerSchedulerscheduleDirect方法并没有去调用createWorker方法来获取一个Worker对象,而是直接将交给了handler来执行。这是为什么?
  在这里,我们得区分scheduleDirect方法的调用时机,scheduleDirect不是每个Scheduler里面都会被调用,而是这有上游的Scheduler才会被调用,也就是subscribeOn里面的Scheduler,在observeOn里面的Scheduler走的是另一个流程。
  所以,当HandlerSchedulerscheduleDirect方法被调用时,此时肯定是上游在工作,这样直接提交给Handler来执行了就是了,不必要去绕圈子。

3. Worker的工作原理

  现在我们来看看Worker的相关源码:

    public abstract static class Worker implements Disposable {
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }
        @NonNull
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
    }

  我将很多繁琐的代码省去了,Worker的代码看上就是这样的,其中,我们发现就有一个 schedule方法。这个方法之前在SchedulerscheduleDirect方法里面我们看到过,在scheduleDirect方法里面传递一个Runnable来执行。
  我们来看一个Worker的子类,看看是怎么执行的。

(1).NewThreadWorker

  先来看看NewThreadWorker的源码:

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable run) {
        return schedule(run, 0, null);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
}

  好像代码有点长😅,但是不急,我们一一的来分析。
  首先在构造方法里面,创建了一个线程池的对象,关于线程池相关的知识,这个属于Java基础知识,这里就不进行讲解,我默认大家都懂哈😂。
 然后就是一系列的schedule调用,最终会调用到scheduleActual方法里面去。scheduleActual方法里面看是做了很多的操作,其实根本目的就是调用ScheduledExecutorServicesubmit方法提交一个任务而已。
  对的,NewThreadWorker就是这么简单,是不是之前将它想的太难了?哈哈哈😂
  整个SchedulerWorker的调用差不多就是这样的,但是是不是觉得,现在还是一脸懵逼,看到这里,我们还是不知道RxJava究竟是怎么进行线程调度的。
  不急,接下来,我们将一个例子入手,由浅入深的带领大家熟悉线程调度的奥妙。我们熟悉了SchedulerWorker,对其他的才会很快了解。

4. subscribeOn的工作流程

  先来举一个栗子:

    Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override
      public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
      }
    }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {
      @Override
      public void onSubscribe(Disposable d) {

      }

      @Override
      public void onNext(Integer integer) {
      }

      @Override
      public void onError(Throwable e) {

      }

      @Override
      public void onComplete() {

      }
    });

  是不是觉得这个栗子非常的熟悉,像极了前世的恋人🤓。熟悉就好,就怕大家淡忘了自己心中那个人。
  这个使用方法,其他的大佬不知道讲解了多少,在这里我就不重复了,直接开门见山的分析subscribeOn方法。
  subscribeOn方法主要是跟ObservableSubscribeOn类有关。我们来看看ObservableSubscribeOn类:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}

  首先,ObservableSubscribeOn的构造方法里面多了一个Scheduler参数,这个参数到底是谁的对象得取决于在subscribeOn时。上面的栗子,我们传递的是schedulers.newThread(),所以这里的scheduler就是NewThreadScheduler的对象。
  在subscribeActual方法里面,我们发现:

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

  先来看看SubscribeTask是什么东西:

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

  SubscribeTask从根本来说就是一个Runnable,当这个Runnable被执行的时候,也就是run方法被调用时,我们发现此时调用了subscribe方法,此时任务就正式启动。假如,我们在向后台获取数据,此时就已经开始获取了。
  然后,我们发现调用了SchedulerscheduleDirect,还记得这个方法在做什么吗?这个方法我们在前面讲解Scheduler时已经讲解,在这个方法里面显示调用onCreateWorker方法来获取一个Worker对象,然后将我们的任务提交给Worker来执行。
  这就是整个subscribeOn的流程,是不是很简单?

5. observeOn的工作流程

  observeOn方法主要是表示下游的方法执行所在的线程,跟subscribeOn方法差不多,observeOn主要跟ObservableObserveOn,我们来看看ObservableObserveOn的相关代码:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

  还是先来看看构造方法,同样的,scheduler也是一个调度器,这里由于Android的主线程,所以是HandlerSchedulerdelayError表示是否延迟抛出异常;bufferSize表示缓冲区的大小,如果上游的发射速度比较快的话,这里有可能出现背压问题。
  然后看看subscribeActual方法里面,我们发现,在这里,调用了onCreateWorker方法来获取一个Worker对象,然后subscribe了一个ObserveOnObserver对象,传进去了一些相关参数。看来真正的核心在ObserveOnObserver这里面。
  不过在这里,我们发现ObservableObserveOnObservableSubscribeOnsubscribeActual方法还是有很大的区别,在ObservableObserveOnsubscribeActual方法没有调用Scheduler的相关方法。之前,我们说了,scheduleDirect方法只在上游执行,从这里就可以看出来。
  我们来看看ObserveOnObserver相关代码,ObserveOnObserver比较重要的是onNext方法,看看在方法里面是怎么将数据分发到主线程去执行的。

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

  onNext方法还是比较简单,先将数据放在队列里面去,然后在调用schedule方法。我们再来看看schedule方法:

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

  这里getAndIncrement() == 0的意思比较好理解,表示当前队列中只有一个数据,就调用Workerschedule方法。到这里,我们根本没有看到调用Observer的onNext方法,是不是分析错了?肯定不是的,我们来看看Workerschedule方法。这里的WorkerHandlerWorker,所以我们看一下HandlerWorkerschedule方法:

        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            if (async) {
                message.setAsynchronous(true);
            }

            handler.sendMessageDelayed(message, unit.toMillis(delay));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

  看到写了这么多,其实就一句话,将传递进来的Runnable交给Handler来执行。那个这个Runnable是谁?就是ObservableObserveOn自己,Handler会执行Runnablerun方法。
  哈哈,现在得回来ObservableObserveOnrun方法:

        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

  这里的两个drain方法着实将我吓着了。其实,认真想一想,跟BufferAsyncEmitter里面的drain方法差不多,这里就不深入的讲解,反正这里就是将数据发送到下游的真正操作,而且是在observeOn方法里面传递的那个线程执行。

6.总结

  RxJava的线程调度这部分的知识着实是不好理解,所以尽管我在这里说的天花乱坠,还是得需要我们亲自来看看这部分的代码。在这里,我对这部分的知识做一个简单的总结。
  1.每种类型线程对一个Scheduler,而每个Scheduler对应一个Worker,真正操作都是Worker来完成的。
  2.子线程执行的根本就是将我们的subscribe方法调用放在一个Runnable里面去执行,类似于代理机制,又像装饰者模式。

上一篇 下一篇

猜你喜欢

热点阅读