RxJava源码分析(三)线程切换subscribeOn

2018-08-12  本文已影响0人  kakaxicm

引言

前面我们梳理了RxJava扩展的观察者模式的实现,今天我们学习RxJava的第二块核心内容:订阅方法的线程切换subscribeOn。

subscribeOn方法

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

根据scheduler对象和原来的Observable对象构造了新的ObservableSubscribeOn对象:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>{
    ...
}

继承自AbstractObservableWithUpstream,注意这里有两个泛型,结合名字意思,可以猜想这个类可能与Observable的变换相关。
AbstractObservableWithUpstream:

/**
 * Base class for operators with a source consumable.
 *
 * @param <T> the input source type
 * @param <U> the output type
 */
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

我们可以看到它内部封装了原ObservableSource<T>,而继承自Observable<U>,其中T为原来的数据类型,U为转换后的数据类型。
下面我们回头再看ObservableSubscribeOn:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
      final Scheduler scheduler;
      //构造方法传入原来的ObservableSource和Scheduler对象
      public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
   @Override
    public void subscribeActual(final Observer<? super T> s) {
        //构造相应的SubscribeOnObserver对象,包装原观察者
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        //核心代码: scheduler.scheduleDirect将 source.subscribe(parent)代码交给scheduler调度
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }
...
}

其中的核心代码,是scheduler.scheduleDirect,将source.subscribe(parent)封装到Runnable方法中,交给scheduler调度,实现的订阅方法的线程切换。
下面我们再看看核心类Scheduler:

线程切换Scheduler和Worker

Scheduler

负责

/**
 * A {@code Scheduler} is an object that specifies an API for scheduling
 * units of work with or without delays or periodically.
 * You can get common instances of this class in {@link io.reactivex.schedulers.Schedulers}.
 * 负责线程切换,支持延迟和周期任务调度
 */
public abstract class Scheduler {
    //子类构造任务执行单元Worker
    public abstract Worker createWorker();
    //开始任务
    public void start() {
    }
    //停止任务
    public void shutdown() {
    }
      
    //立即执行任务
    public Disposable scheduleDirect(Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }    
    
    //run代码交给Worker调度
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        //取Worker
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //Worker任务调度
        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);
        return w;
    }
//调度周期性任务
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
        Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }
        return periodicTask;
   }
   ....
}

发现它的调度任务都是通过Worker实现,看看它有哪些东西:

Worker

public abstract static class Worker implements Disposable {
     //执行run代码
    public Disposable schedule(Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
     }
     //子类覆写具体的调度方法 
    public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
....
}

主要的线程调度实现类

IoScheduler

这个类源码比较长,我们捡重点分析

/**
 * Scheduler that creates and caches a set of thread pools and reuses them if possible.
 */
public final class IoScheduler extends Scheduler {
//CachedWorkerPool线程安全的引用,CachedWorkerPool维护
final AtomicReference<CachedWorkerPool> pool;
...
//初始化CachedWorkerPool
public IoScheduler() {
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }

    @Override
    public void start() {
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
   ...
   @Override
    //关键方法,根据ThreadWorker回收池构造EventLoopWorker
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    ....
}

再来看EventLoopWorker:

 static final class EventLoopWorker extends Scheduler.Worker {
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;
        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            //从缓存池中取
            this.threadWorker = pool.get();
        }
      ....
   
        @Override
        public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            //最后调用threadWorker.scheduleActual执行run方法
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

先看看pool.get()方法:

static final class CachedWorkerPool implements Runnable{
     //未过期的闲置ThreadWorker队列
      private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
       //执行清理过期ThreadWorker的线程池
       private final ScheduledExecutorService evictorService;
        CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
           ...
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                 //延时执行清理ThreadWorker方法
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

        @Override
        public void run() {
            evictExpiredWorkers();
        }
        
        //从回收池中取ThreadWorker
        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(WORKER_THREAD_FACTORY);
            allWorkers.add(w);
            return w;
        }
        //回收threadWorker
        void release(ThreadWorker threadWorker) {
            // Refresh expire time before putting worker back in pool
            threadWorker.setExpirationTime(now() + keepAliveTime);

            expiringWorkerQueue.offer(threadWorker);
        }
        //清理过期的ThreadWorker
        void evictExpiredWorkers() {
            if (!expiringWorkerQueue.isEmpty()) {
                long currentTimestamp = now();

                for (ThreadWorker threadWorker : expiringWorkerQueue) {
                    if (threadWorker.getExpirationTime() <= currentTimestamp) {
                        if (expiringWorkerQueue.remove(threadWorker)) {
                            allWorkers.remove(threadWorker);
                        }
                    } else {
                        // Queue is ordered with the worker that will expire first in the beginning, so when we
                        // find a non-expired worker we can stop evicting.
                        break;
                    }
                }
            }
        }
        ....
}

CachedWorkerPool维护了一个ThreadWorker回收池,EventLoopWorker从中取ThreadWorker来执行任务。
我们再回头看threadWorker.scheduleActual方法如何调度任务的,依然是挑重点看:

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
     //线程池
    private final ScheduledExecutorService executor;
    ...
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //封装原始的run
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            //runnable交给线程池调度
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            parent.remove(sr);
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }
    ...
}

到目前为止我们走完了IO线程调度的大致流程,下面再看看主线程的调度HandlerScheduler.

HandlerScheduler

外部切换主线程使用MainHolder. DEFAULT对象:

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

传入绑定MainLooper的Handler对象构造HandlerScheduler。

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        //封装原始的Runnable对象
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
        return scheduled;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
    
    //最终的调度类
    private static final class HandlerWorker extends Worker {
        private final Handler handler;

        private volatile boolean disposed;

        HandlerWorker(Handler handler) {
            this.handler = handler;
        }

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            //发送给主线程执行run
            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
    //封装原始的Runnable对象
    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed;

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                RxJavaPlugins.onError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacks(this);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
}

分析完订阅方法的线程切换,我们可以考虑这个问题:订阅方法连续切换为什么总是以第一次调用为准?
我的理解如下:
1.在subscribeActual()里开启了Scheduler的工作,source.subscribe(parent);,从这一句开始切换了线程,所以在这之上的代码都是在切换后的线程里的了。
2.连续切换时,执行订阅操作时,最上面的切换(scheduleDirect)最后执行,此时source.subscribe(parent)所在的线程变成了最上面的subscribeOn(xxxx)指定的线程
3.发送数据的方法,最终还是在最上面的source.subscribe(parent)中执行。
接下来的博客我们继续分析观察者方法的线程调度。

上一篇下一篇

猜你喜欢

热点阅读