RxJava之Schedulers源码介绍

2019-06-01  本文已影响0人  103style

转载请以链接形式标明出处:
本文出自:103style的博客

Base on RxJava 2.X

简介

RxJava 的 Schedulers 提供了以下五种 Scheduler(调度器):

static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    IO = RxJavaPlugins.initIoScheduler(new IOTask());
    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    TRAMPOLINE = TrampolineScheduler.instance();
}

Schedulers.single() 为例介绍

如果我们没有调用 setInitXXSchedulerHandler 或者 setXXSchedulerHandler 自己实现调度器的话(XX 代表上面除了 TRAMPOLINE 的四种调度器的名字),我们开发中用到的 Schedulers.io(); Schedulers.computation(); Schedulers.newThread(); Schedulers.single(); 实际上就是对应的 XXTaskcall()方法返回的 Scheduler 对象,即对应的 XXScheduler 对象。

public static void setInitSingleSchedulerHandler(@Nullable Function<? super Callable<Scheduler>, ? extends Scheduler> handler) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    }
    onInitSingleHandler = handler;
}

public static void setSingleSchedulerHandler(@Nullable Function<? super Scheduler, ? extends Scheduler> handler) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    }
    onSingleHandler = handler;
}

以下是 Schedulers.single() 的源码介绍:

所以 Schedulers.single() 实际返回的是 SingleScheduler 对象.
同样的:
Schedulers.io(); 实际返回的是 IoScheduler 对象
Schedulers.computation(); 实际返回的是 ComputationScheduler 对象
Schedulers.newThread(); 实际返回的是 NewThreadScheduler 对象


SingleScheduler 源码介绍

public final class SingleScheduler extends Scheduler {
    final ThreadFactory threadFactory;
    final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();

    private static final String KEY_SINGLE_PRIORITY = "rx2.single-priority";
    private static final String THREAD_NAME_PREFIX = "RxSingleScheduler";

    static final RxThreadFactory SINGLE_THREAD_FACTORY;

    static final ScheduledExecutorService SHUTDOWN;
    static {
        SHUTDOWN = Executors.newScheduledThreadPool(0);
        SHUTDOWN.shutdown();

        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_SINGLE_PRIORITY, Thread.NORM_PRIORITY)));

        SINGLE_THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority, true);//1.1
    }

    public SingleScheduler() { //1.0
        this(SINGLE_THREAD_FACTORY);
    }

    public SingleScheduler(ThreadFactory threadFactory) {//2.0
        this.threadFactory = threadFactory;
        executor.lazySet(createExecutor(threadFactory));//4.0
    }

    static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {//3.0
        return SchedulerPoolFactory.create(threadFactory);
    }
    ...
}

我们之前在 Rxjava之timer和interval操作符源码解析 介绍过 timer操作符在订阅的时候会执行ObservableTimersubscribeActual 方法,

public void subscribeActual(Observer<? super Long> observer) {
    TimerObserver ios = new TimerObserver(observer);
    observer.onSubscribe(ios);
    Disposable d = scheduler.scheduleDirect(ios, delay, unit);
    ios.setResource(d);
}

其中的 scheduler.scheduleDirect(ios, delay, unit)中 会通过createWorker()创建一个 Worker

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker(); //
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

NewThreadScheduler 源码介绍

SingleScheduler类似NewThreadScheduler也是构建了一个核心线程数为1ScheduledExecutorService
区别就是 NewThreadScheduler 每次调用 Schedulers.newThread() 都是重新创建了一个新的线程池, 不需要去记录之前运行的任务,每个任务之前不会有什么关联,所以使用得时候要注意。
以下代码是 NewThreadWorkerscheduleDirect方法:

public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
    ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
    try {
        Future<?> f;
        if (delayTime <= 0L) {
            f = executor.submit(task);
        } else {
            f = executor.schedule(task, delayTime, unit);
        }
        task.setFuture(f);
        return task;
    } catch (RejectedExecutionException ex) {
        RxJavaPlugins.onError(ex);
        return EmptyDisposable.INSTANCE;
    }
}

ScheduledDirectTask:执行任务的返回值为 null

public final class ScheduledDirectTask extends AbstractDirectTask implements Callable<Void> {
    private static final long serialVersionUID = 1811839108042568751L;
    public ScheduledDirectTask(Runnable runnable) {
        super(runnable);
    }
    @Override
    public Void call() throws Exception {
        runner = Thread.currentThread();
        try {
            runnable.run();
        } finally {
            lazySet(FINISHED);
            runner = null;
        }
        return null;
    }
}

ComputationScheduler 源码介绍

ComputationSchedulerRxjava之timer和interval操作符源码解析 中已经介绍过,就不再赘述了。


IoScheduler 源码介绍


小结

Schedulers.single() 实际返回的是 SingleScheduler
Schedulers.io() 实际返回的是 IoScheduler
Schedulers.computation() 实际返回的是 ComputationScheduler
Schedulers.newThread() 实际返回的是 NewThreadScheduler

createWorker() 返回的值分别为:
Schedulers.single()ScheduledWorker
Schedulers.io()ThreadWorker
Schedulers.computation()PoolWorker
Schedulers.newThread()NewThreadWorker

SingleSchedulerSchedulers.io()NewThreadSchedulerSchedulers.computation() 最终都是通过 Executors.newScheduledThreadPool(1, factory);构建的核心线程数为1的线程池。

区别就是 Schedulers.newThread() 每次都是创建新的线程池, 而其他的都是服用之前已经创建得线程池!!! 所以要慎重选择。

以上

上一篇下一篇

猜你喜欢

热点阅读