火爆框架

Rxjava2.0 AndroidSchedulers. mai

2021-07-08  本文已影响0人  Android程序员老鸦

这篇文章探究一下rxjava安卓主线程是怎么实现的,上代码:

public final class AndroidSchedulers {

    public static Scheduler mainThread() {
        //老样式,这个方法实际直接返回入参MAIN_THREAD,追着他看就行
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
  //静态变量,initMainThreadScheduler()返回 MainHolder.DEFAULT,追着看MainHolder.DEFAULT
    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });
    //静态内部类MainHolder ,DEFAULT 在这里初始化,HandlerScheduler就是我们要的主线程scheduler
    private static final class MainHolder {
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

最终得到一个HandlerScheduler对象,这个就是安卓主线程的Scheduler了,看他的构造方法,传入了一个new Handler(Looper.getMainLooper()),很熟悉吧。
了解这个类之前,先把它在哪里调用的代码放出来,在上上篇线程切换的文章里讲到过的:

  @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //这个scheduler就是HandlerScheduler,createWorker()方法跟上篇的io线程方法步骤调用差不多
            Scheduler.Worker w = scheduler.createWorker();
             //最终这个 worker放入了ObserveOnObserver,最终执行schdule()方法来执行onNext()操作,达到切换线程的目的。
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

既然调用步骤知道了,就带着这些方法来看看HandlerScheduler类:

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
        return scheduled;
    }
    //直接new了一个HandlerWorker
    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
    //HandlerWorker是哥静态内部类,看来各种Scheduler的结构都是相似的
    private static final class HandlerWorker extends Worker {
        private final Handler handler;

        private volatile boolean disposed;
        //构造方法,传入了那个有主线程looper的handler,
        HandlerWorker(Handler handler) {
            this.handler = handler;
        }
        //最终执行线程任务的地方
        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            //handler的消息机制,obtainf()方法把scheduled 这个raunnable给了massage的callback
          
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
             //发送出去了,最终这个message会被主线程的Handler劫持,然后因为msg的callback不为空,
            //最终它会自己执行run()方法,这样就达到了在主线程执行下游onNext()的目的,
            //不熟悉handler的要再去看看它的源码咯
            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
    //封装的一个代理类,方便管理生命周期
    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed;

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                RxJavaPlugins.onError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacks(this);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
}
总结:

rxjava切换到android主线程的关键就是那个获取了mainLooper的handler,利用message和handler的机制达到了切换的目的。

上一篇下一篇

猜你喜欢

热点阅读