Android-RxJava

杂谈之Rxjava线程变换原理浅谈

2019-10-26  本文已影响0人  曾逸111

刚参加工作那会,Rxjava很火,所谓的链式编程,很多人都说链式的风格看起来代码逻辑清晰,精简,容易看懂。但我好像不太以为然,毕竟,各种各样的操作符变换,什么map,flatMap,observeOn,subscribeOn分分钟把你绕晕。当然,如果从业务代码的角度,通过操作符将它们链式组装起来,确实会清晰很多。但当时我更关注的是其背后的东西,即,Rxjava是怎么组装我们的业务代码成为链式的,调用链是如何形成的,调用链的代码又是如何执行的,它的变换原理究竟是怎样的。后来,秉着知其然,也要知其所以然的原则就去研究了它的源码,记得当时,看flatMap的时候,总没法不看代码就能在大脑里想出其调用链的关系,总感觉饶不过弯,不看,就没法想明白,后来想啊想,终于在饭堂排队吃饭的时候突然给想懂了。记得当时得意又开心~

转眼间,工作已3年多。Rxjava出了2.0,近期好像在准备出3.0。2.0的代码看了一些,原理没有变,多了一些api,诸如disposable等等。
在近期的电话面试里,也经常比较喜欢问到:Rxjava是如何做到线程变换或者Observable变换的,它的变换原理是怎样的?但结果总是差强人意。。懂原理的人真的很少。

...

上篇文章讲了Rxjava的Observable变换原理,这次讲其线程变换原理。其实,线程变换也是基于Observable的变换。

我们知道,Android中我们一般是通过Handler机制去实现线程的变换的。无论是变换到主线程,或者是变换到拥有Looper的子线程,Handler都可以搞定。那么Rxjava中的线程变换,也是使用到了Handler吗?其实,除了使用Handler之外,其也用到了Executor,线程池。但是,对外暴露的不是Handler,也不是Executor,而是一个叫Scheduler的抽象类,还有它内部的一个叫Worker的抽象类。由这两个类,再最后delegate到Executor或者Handler来实现变换。Scheduler叫调度器,一般Scheduler的执行会delegate到Worker来执行。我们在创建Observable的时候,就会通过操作符创建Observable所对应的Scheduler,当调用链执行到对应的Observable节点的时候,就会将后续任务或代码交给其对应的Scheduler来执行,从而达到切换线程的目的。

这里,调用链代码的执行可区分为两种,一种是向上产生订阅,即调用链向上游传递订阅任务;一种是订阅完成后,在上游执行完成任务之后向下游发射执行结果。
第一种,一般是将执行代码内置于Runnable中,再将Runnable丢给Scheduler来执行,从而达到切换线程的目的。
第二种,当然了,也是需要有一个Runnable来给Scheduler或者Worker执行,区别在于它会有一个队列来存放执行结果,然后在切换线程之后再从队列里获取。

举个例子🌰,看看源码
首先第一种是向上游传递订阅任务时的线程变换

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
--------
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

subscribeActual里边,通过scheduler.scheduleDirect(task)实现了线程切换;在切换之后继续向上游订阅,source.subscribe(parent)。

第二种:向下游发射任务结果的线程变换,一般应用于切换到主线程获取然后刷新界面。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

-----
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }


        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

        void drainNormal() {
            int missed = 1;
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
                for (;;) {
                    boolean d = done;
                    T v;
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        
                    }
                  ....
                    a.onNext(v);
                }

先在subscribeActual中初始化worker,worker内置于Observer,然后继续向上游订阅传递Observer。接收订阅时,也就是在onNext,先将结果t存放于队列中,然后启用worker的任务,在worker执行的时候,将其从队列中取出,再继续向下游发射结果,完成线程的切换。

你看,线程切换原理就是这么简单~

上一篇 下一篇

猜你喜欢

热点阅读