RxJava源码分析(三)线程切换subscribeOn
引言
前面我们梳理了RxJava扩展的观察者模式的实现,今天我们学习RxJava的第二块核心内容:订阅方法的线程切换subscribeOn。
subscribeOn方法
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
根据scheduler对象和原来的Observable对象构造了新的ObservableSubscribeOn对象:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>{
...
}
继承自AbstractObservableWithUpstream,注意这里有两个泛型,结合名字意思,可以猜想这个类可能与Observable的变换相关。
AbstractObservableWithUpstream:
/**
* Base class for operators with a source consumable.
*
* @param <T> the input source type
* @param <U> the output type
*/
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
我们可以看到它内部封装了原ObservableSource<T>,而继承自Observable<U>,其中T为原来的数据类型,U为转换后的数据类型。
下面我们回头再看ObservableSubscribeOn:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
//构造方法传入原来的ObservableSource和Scheduler对象
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//构造相应的SubscribeOnObserver对象,包装原观察者
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
//核心代码: scheduler.scheduleDirect将 source.subscribe(parent)代码交给scheduler调度
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
...
}
其中的核心代码,是scheduler.scheduleDirect,将source.subscribe(parent)封装到Runnable方法中,交给scheduler调度,实现的订阅方法的线程切换。
下面我们再看看核心类Scheduler:
线程切换Scheduler和Worker
Scheduler
负责
/**
* A {@code Scheduler} is an object that specifies an API for scheduling
* units of work with or without delays or periodically.
* You can get common instances of this class in {@link io.reactivex.schedulers.Schedulers}.
* 负责线程切换,支持延迟和周期任务调度
*/
public abstract class Scheduler {
//子类构造任务执行单元Worker
public abstract Worker createWorker();
//开始任务
public void start() {
}
//停止任务
public void shutdown() {
}
//立即执行任务
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
//run代码交给Worker调度
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
//取Worker
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//Worker任务调度
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
//调度周期性任务
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
return periodicTask;
}
....
}
发现它的调度任务都是通过Worker实现,看看它有哪些东西:
Worker
public abstract static class Worker implements Disposable {
//执行run代码
public Disposable schedule(Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
//子类覆写具体的调度方法
public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
....
}
主要的线程调度实现类
IoScheduler
这个类源码比较长,我们捡重点分析
/**
* Scheduler that creates and caches a set of thread pools and reuses them if possible.
*/
public final class IoScheduler extends Scheduler {
//CachedWorkerPool线程安全的引用,CachedWorkerPool维护
final AtomicReference<CachedWorkerPool> pool;
...
//初始化CachedWorkerPool
public IoScheduler() {
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
...
@Override
//关键方法,根据ThreadWorker回收池构造EventLoopWorker
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
....
}
再来看EventLoopWorker:
static final class EventLoopWorker extends Scheduler.Worker {
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
//从缓存池中取
this.threadWorker = pool.get();
}
....
@Override
public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//最后调用threadWorker.scheduleActual执行run方法
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
先看看pool.get()方法:
static final class CachedWorkerPool implements Runnable{
//未过期的闲置ThreadWorker队列
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
//执行清理过期ThreadWorker的线程池
private final ScheduledExecutorService evictorService;
CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
...
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
//延时执行清理ThreadWorker方法
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
@Override
public void run() {
evictExpiredWorkers();
}
//从回收池中取ThreadWorker
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(WORKER_THREAD_FACTORY);
allWorkers.add(w);
return w;
}
//回收threadWorker
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
//清理过期的ThreadWorker
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
....
}
CachedWorkerPool维护了一个ThreadWorker回收池,EventLoopWorker从中取ThreadWorker来执行任务。
我们再回头看threadWorker.scheduleActual方法如何调度任务的,依然是挑重点看:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
//线程池
private final ScheduledExecutorService executor;
...
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//封装原始的run
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//runnable交给线程池调度
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
parent.remove(sr);
RxJavaPlugins.onError(ex);
}
return sr;
}
...
}
到目前为止我们走完了IO线程调度的大致流程,下面再看看主线程的调度HandlerScheduler.
HandlerScheduler
外部切换主线程使用MainHolder. DEFAULT对象:
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
传入绑定MainLooper的Handler对象构造HandlerScheduler。
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
//封装原始的Runnable对象
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
//最终的调度类
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
//发送给主线程执行run
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
//封装原始的Runnable对象
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
分析完订阅方法的线程切换,我们可以考虑这个问题:订阅方法连续切换为什么总是以第一次调用为准?
我的理解如下:
1.在subscribeActual()里开启了Scheduler的工作,source.subscribe(parent);,从这一句开始切换了线程,所以在这之上的代码都是在切换后的线程里的了。
2.连续切换时,执行订阅操作时,最上面的切换(scheduleDirect)最后执行,此时source.subscribe(parent)所在的线程变成了最上面的subscribeOn(xxxx)指定的线程
3.发送数据的方法,最终还是在最上面的source.subscribe(parent)中执行。
接下来的博客我们继续分析观察者方法的线程调度。