RxJava(2):线程切换原理

2021-07-07  本文已影响0人  壹元伍角叁分

一、subscribeOn(Schedulers.io())原理

  Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {

                    }
                })
                .subscribeOn(Schedulers.io())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

源码分析

  1. Schedulers.io(),表明上面的代码执行在子线程中

    IO = RxJavaPlugins.initIoScheduler(new IOTask());
    
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }
    
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    
    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }
    
    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }
    
  2. .subscribeOn(Schedulers.io()),ObservableCreate和Schedulers.io()作为参数,new出一个ObservableSubscribeOn对象,返回

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    

    ObservableSubscribeOn中source就是ObservableCreate

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    }
    
  3. .subscribe(new Observer<String>())

    .subscribe(new Observer<Boolean>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe: 终点的监听执行 onSubscribe");
        }
    
        @Override
        public void onNext(Boolean aBoolean) {
            Log.d(TAG, "onNext: 终点的监听执行 onNext aBoolean=" + aBoolean);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete: 终点的监听执行 onComplete");
        }
    });
    

    将终点的监听作为参数传入,subscribe()是ObservableSubscribeOn父类Observable的方法。实际调用的是ObservableSubscribeOn.subscribeActual(observer)方法

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //将终点observer封装一层SubscribeOnObserver
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //先调用终点observer的onSubscribe方法
        s.onSubscribe(parent);
        //下面代码重点是这个scheduler.scheduleDirect(new SubscribeTask(parent))
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
  4. scheduler.scheduleDirect(new SubscribeTask(parent))

    1. 首先是创建了一个SubscribeTask对象,他是实现了runnable接口的,看下它的run方法,调用了source.subscribe(parent),source就是obserableCreate,所以它的run方法是调用了obserableCreate.subscribe(parent)。

      final class SubscribeTask implements Runnable {
          private final SubscribeOnObserver<T> parent;
      
          SubscribeTask(SubscribeOnObserver<T> parent) {
              this.parent = parent;
          }
      
          @Override
          public void run() {
              source.subscribe(parent);
          }
      }
      
    2. scheduler就是IoScheduler。scheduler.scheduleDirect(),scheduleDirect是IoScheduler父类Scheduler的方法,方法中通过createWorker()创建了一个Worker对象,createWorker()具体实现是在IoScheduler中。IoScheduler的createWorker()创建并返回了一个EventLoopWorker对象。

    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //createWorker()具体实现是在IoScheduler中。IoScheduler的createWorker()创建并返回了一个EventLoopWorker对象。
        final Worker w = createWorker();
        //继续封装
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        //正在开始执行了,EventLoopWorker.schedule(task, delay, unit);
        w.schedule(task, delay, unit);
    
        return task;
    }
    
    @Override
    public Disposable schedule(@NonNull Runnable action) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
    
        return poolWorker.scheduleActual(action, 0, TimeUnit.MILLISECONDS, serial);
    }
    
    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //再封装
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
    
        Future<?> f;
        //这里交由线程池处理,obserableCreate.subscribe(parent)就运行在了子线程中。
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
    
        return sr;
    }
    

代码执行流程:

  1. observable.create传入了自定义source,返回一个obserableCreate对象;
  2. obserableCreate.subscribeOn(Schedulers.io())。Schedulers.io()返回的是一个IoScheduler对象,将其作为参数。在subscribeOn方法中,返回了一个ObservableSubscribeOn对象,将obserableCreate(即source)和IoScheduler(即scheduler)传入;
  3. ObservableSubscribeOn.subscribe(observer),传入一个自定义的终点监听observer。
  4. 在ObservableSubscribeOn.subscribeActual(observer)方法中,将传入的终点监听封装了一层,将终点observer对象作为参数,定义了一个SubscribeOnObserver对象(下面的parent)。然后先调用了终点observer的onSubscribe(parent),参数是刚定义的SubscribeOnObserver对象。
  5. 接着又调用了IoScheduler.scheduleDirect(new SubscribeTask(parent))。这个方法很重要。首先是创建了一个SubscribeTask对象,他是实现了runnable接口的,看下它的run方法,调用了source.subscribe(parent),source就是obserableCreate,所以它的run方法是调用了obserableCreate.subscribe(parent)。
  6. scheduleDirect是IoScheduler父类Scheduler的方法,方法中通过createWorker()创建了一个Worker对象,createWorker()具体实现是在IoScheduler中。IoScheduler的createWorker()创建并返回了一个EventLoopWorker对象。
  7. EventLoopWorker.schedule()方法中threadWorker.scheduleActual(),scheduleActual方法中继续将SubscribeTask进行封装一层,然后交由线程池去处理。这样就obserableCreate.subscribe(parent)就运行在了子线程中。

二、observeOn(AndroidSchedulers.mainThread())原理

 Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {

                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

源码分析

observeOn(AndroidSchedulers.mainThread()),表明下面的代码执行在主线程中

订阅过程

  1. AndroidSchedulers.mainThread(),返回的是HandlerScheduler对象,内部维护了一个主线程的Handler

    public final class AndroidSchedulers {
    
       private static final class MainHolder {
            //返回的是HandlerScheduler对象,内部维护了一个主线程的Handler
            static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
        }
    
      private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
                new Callable<Scheduler>() {
                    @Override public Scheduler call() throws Exception {
                        return MainHolder.DEFAULT;
                    }
                });
                
      public static Scheduler mainThread() {
             return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
       }
     }
    
    final class HandlerScheduler extends Scheduler {
        private final Handler handler;
    
        HandlerScheduler(Handler handler) {
            this.handler = handler;
        }
    }
    
  2. .observeOn(AndroidSchedulers.mainThread()),HandlerScheduler作为参数,返回ObservableObserveOn对象

    public abstract class Observable<T> implements ObservableSource<T> {
    
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    }
    

    ObservableObserveOn中scheduler就是HandlerScheduler

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;//HandlerScheduler,内部维护了一个主线程的Handler
        final boolean delayError;
        final int bufferSize;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    }
    
  3. .subscribe(new Observer<String>())

    .subscribe(new Observer<Boolean>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe: 终点的监听执行 onSubscribe");
        }
    
        @Override
        public void onNext(Boolean aBoolean) {
            Log.d(TAG, "onNext: 终点的监听执行 onNext aBoolean=" + aBoolean);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete: 终点的监听执行 onComplete");
        }
    });
    

    将终点的监听作为参数传入,subscribe()是ObservableObserveOn父类Observable的方法,ObservableObserveOn没有重写subscribe()。而实际调用的是subscribeActual(),ObservableObserveOn重写了,所以走的是ObservableObserveOn.subscribeActual(observer)方法;

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
    
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
            subscribeActual(observer);//这句是重点
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);
    
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
    

    再来看下ObservableObserveOn中subscribeActual的具体实现,实际调用的又是source.subscribe(),这个source就是前面保存的ObservableCreate对象。所以实际调用的是ObservableCreate.subscribe()。

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    
       @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
               //走的是这个。HandlerScheduler.createWorker(),返回HandlerWorker对象
                Scheduler.Worker w = scheduler.createWorker();
               //source就是自定义source,又包了一层,将终点observer和HandlerWorker作为参数
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    }
    

    HandlerScheduler.createWorker(),返回HandlerWorker对象。HandlerScheduler中的handler前面已经定义了,是一个主线程的Handler

    final class HandlerScheduler extends Scheduler {
       @Override
       public Worker createWorker() {
           return new HandlerWorker(handler);
       }
    }
    

    source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))。source就是自定义source,又包了一层,将终点observer和HandlerWorker作为参数

    public final class ObservableCreate<T> extends Observable<T> {
       @Override
       protected void subscribeActual(Observer<? super T> observer) {
           CreateEmitter<T> parent = new CreateEmitter<T>(observer);//ObserveOnObserver,这边封了一层箱
           observer.onSubscribe(parent);//调用了终点的监听的onSubscribe方法
    
           try {
               source.subscribe(parent);//这边调用的是我们自定义source的subscribe方法
           } catch (Throwable ex) {
               Exceptions.throwIfFatal(ex);
               parent.onError(ex);
           }
       }
    }
    

响应事件过程

  1. 在自定义source中模拟调用onNext方法

    new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
            //observableEmitter就是CreateEmitter
            observableEmitter.onNext("测试onNext()");
        }
    }
    
  2. CreateEmitter.onNext(T t)内部又继续调用ObserveOnObserver.onNext(t);

    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
    
        final Observer<? super T> observer; // ObserveOnObserver
    
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);//ObserveOnObserver.onNext(t);
            }
        }
    
  3. ObserveOnObserver.onNext(T t)

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
        final Observer<? super T> actual;//终点observer
        final Scheduler.Worker worker;//HandlerWorker
        
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
    
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        
          void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this);//HandlerWorker.schedule(this)
                }
            }
    }
    
  4. HandlerWorker.schedule(this),向主线程中发送消息,执行ScheduledRunnable的run方法

    private static final class HandlerWorker extends Worker {
        private final Handler handler;//是一个主线程的Handler
    
        HandlerWorker(Handler handler) {
            this.handler = handler;
        }
    
        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
    
            if (disposed) {
                return Disposables.disposed();
            }
    
            run = RxJavaPlugins.onSchedule(run);
    
            //handler:主线程的Handler。run:ObserveOnObserver
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
            //创建
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }
    
            return scheduled;
        }
    }
    
  5. ScheduledRunnable.run(),run()又调用了ObserveOnObserver.run()

     private static final class ScheduledRunnable implements Runnable, Disposable {
            private final Handler handler;
            private final Runnable delegate;
    
            ScheduledRunnable(Handler handler, Runnable delegate) {
                this.handler = handler;
                this.delegate = delegate;
            }
    
            @Override
            public void run() {
                try {
                    delegate.run();//ObserveOnObserver.run()
                } catch (Throwable t) {
                    IllegalStateException ie =
                        new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                    RxJavaPlugins.onError(ie);
                    Thread thread = Thread.currentThread();
                    thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
                }
            }
    }
    
  6. ObserveOnObserver.run();

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
       @Override
       public void run() {
           if (outputFused) {
               drainFused();
           } else {
               drainNormal();
           }
        }
    }
    
     void drainFused() {
        ...
        actual.onNext(null);//执行终点observer的next方法,在主线程中执行
        ...
    }
    

代码执行流程:

上一篇 下一篇

猜你喜欢

热点阅读