2019-11-08 Rxjava 源码解析<3>

2019-11-11  本文已影响0人  猫KK

上一篇,我们看了如何使用.subscribeOn(Schedulers.io())方法就能使subscribe在子线程中运行,接下来继续看,如何在切回主线程

    var sources = object : ObservableOnSubscribe<String> {
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("下一步")
                emitter.onComplete()
            }
        }
        var observable = Observable.create(sources)
        var observable1 = observable.subscribeOn(Schedulers.io())
        var observable2 = observable1.observeOn(AndroidSchedulers.mainThread())
        var observer = object :Observer<String>{
            override fun onComplete() {
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: String) {
            }

            override fun onError(e: Throwable) {
            }
        }
        observable2.subscribe(observer)

前面都分析了,看observable1.observeOn(AndroidSchedulers.mainThread())做了什么

    public final Observable<T> observeOn(Scheduler scheduler) {
       //bufferSize() 缓冲大小
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //做判断
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        //返回ObservableObserveOn对象
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

所以observable1.observeOn(AndroidSchedulers.mainThread())返回的是ObservableObserveOn对象,那么当调用 observable2.subscribe(observer)方法时,就是调用ObservableObserveOn.subscribe()方法,根据前两篇可以知道,就会进入ObservableObserveOn的subscribeActual()方法中

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //判断scheduler的类型
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

首先会判断scheduler的类型,scheduler的类型就是AndroidSchedulers.mainThread()返回的对象,看这个返回什么

    public static Scheduler mainThread() {
        //返回MAIN_THREAD的值
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    //这里会调用call方法,所以返回MainHolder.DEFAULT
    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    //静态内部类,所以返回的是HandlerScheduler对象
    private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
    }

通过上面可以知道scheduler是HandlerScheduler对象,其中HandlerScheduler是直接继承Scheduler,所以scheduler不是TrampolineScheduler类型,回到subscribeActual()方法中

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //前面分析了,这里是false,会进入else分支
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //通过scheduler生成Worker
            Scheduler.Worker w = scheduler.createWorker();
            //绑定,其中source是上一层的observable,在这里就是observable1
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

根据前面的分析source.subscribe()就会进入ObserveOnObserver的onSubscribe方法中

    //ObserveOnObserver的onSubscribe方法

       @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                //....

                //初始化queue
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //调用observer的onSubscribe方法,downstream就是我们的observer
                downstream.onSubscribe(this);
            }
        }

这个方法主要就是初始化queue,根据前面,可以知道,当调用onNext,也会来到这里的onNext

    //ObserveOnObserver的onNext

       @Override
        public void onNext(T t) {
            //判断是否停止,为false
            if (done) {
                return;
            }
            //判断是否为不是异步
            if (sourceMode != QueueDisposable.ASYNC) {
                //将该消息放入queue中
                queue.offer(t);
            }
            //调度
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                //调用worker.schedule()方法
                worker.schedule(this);
            }
        }

又回到了worker.schedule方法,其中worker是通过HandlerScheduler.createWorker()生成的

    @Override
    public Worker createWorker() {
        //返回HandlerWorker对象
        return new HandlerWorker(handler, async);
    }

当前worker是HandlerWorker对象,看schedule()方法

        //注意,这里是三个参数的方法,调用的时候是调用一个参数的方法
        //是因为在父类中一个参数的方法会调用三个参数的方法,所以来到这里
        @Override
        @SuppressLint("NewApi") // Async will only be true when the API is available to call.
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            //做判断
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
            //判断是否解绑
            if (disposed) {
                return Disposables.disposed();
            }
           //检验run
            run = RxJavaPlugins.onSchedule(run);
            //创建ScheduledRunnable对象,其中ScheduledRunnable是实现Runnable
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            //生成Message
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            //async为false
            if (async) {
                message.setAsynchronous(true);
            }
            //通过handler发送消息
            handler.sendMessageDelayed(message, unit.toMillis(delay));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

通过handler发送一个消息,所以会走到ScheduledRunnable的run方法中,注意,这里的handler的通过Looper.getMainLooper()方法获取,是主线程的handler,所以run是运行在主线程中的,这样就实现了从其他线程切换到主线程中

      //ScheduledRunnable的run方法

       @Override
        public void run() {
            try {
                //调用delegate的run方法
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }

其中delegate就是我们传过来的run对象,也就是worker.schedule(this);传过来的this,所以又会回到ObserveOnObserver的run方法中

        @Override
        public void run() {
            //outputFused默认为false
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
    
        void drainNormal() {
            int missed = 1;
            //获取queue,是在onSubscribe方法中初始化的
            final SimpleQueue<T> q = queue;
            //获取Observer,也就是我们的observer
            final Observer<? super T> a = downstream;
            //死循环
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
                //双重死循环
                for (;;) {
                    //判断是否已经停止
                    boolean d = done;
                    T v;

                    try {
                        //获取onNext(T t)中传过来的参数
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    //判断是否获取到
                    boolean empty = v == null;
                    //如果没有获取到,则调用observer.onComplete()方法,并跳出循环
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    //没有获取到,跳出循环
                    if (empty) {
                        break;
                    }
                    //调用observer.onNext并将参数传过去
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

之后就走到我们自己的observer的onNext方法中,并且,此时是运行在主线程中的,这样就实现了从其他线程切换到主线程的功能。

上一篇下一篇

猜你喜欢

热点阅读