Rxjava2.0 AndroidSchedulers. mai
2021-07-08 本文已影响0人
Android程序员老鸦
这篇文章探究一下rxjava安卓主线程是怎么实现的,上代码:
public final class AndroidSchedulers {
public static Scheduler mainThread() {
//老样式,这个方法实际直接返回入参MAIN_THREAD,追着他看就行
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
//静态变量,initMainThreadScheduler()返回 MainHolder.DEFAULT,追着看MainHolder.DEFAULT
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
//静态内部类MainHolder ,DEFAULT 在这里初始化,HandlerScheduler就是我们要的主线程scheduler
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
最终得到一个HandlerScheduler对象,这个就是安卓主线程的Scheduler了,看他的构造方法,传入了一个new Handler(Looper.getMainLooper()),很熟悉吧。
了解这个类之前,先把它在哪里调用的代码放出来,在上上篇线程切换的文章里讲到过的:
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//这个scheduler就是HandlerScheduler,createWorker()方法跟上篇的io线程方法步骤调用差不多
Scheduler.Worker w = scheduler.createWorker();
//最终这个 worker放入了ObserveOnObserver,最终执行schdule()方法来执行onNext()操作,达到切换线程的目的。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
既然调用步骤知道了,就带着这些方法来看看HandlerScheduler类:
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
//直接new了一个HandlerWorker
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
//HandlerWorker是哥静态内部类,看来各种Scheduler的结构都是相似的
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
//构造方法,传入了那个有主线程looper的handler,
HandlerWorker(Handler handler) {
this.handler = handler;
}
//最终执行线程任务的地方
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//handler的消息机制,obtainf()方法把scheduled 这个raunnable给了massage的callback
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
//发送出去了,最终这个message会被主线程的Handler劫持,然后因为msg的callback不为空,
//最终它会自己执行run()方法,这样就达到了在主线程执行下游onNext()的目的,
//不熟悉handler的要再去看看它的源码咯
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
//封装的一个代理类,方便管理生命周期
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
总结:
rxjava切换到android主线程的关键就是那个获取了mainLooper的handler,利用message和handler的机制达到了切换的目的。