RxJava2学习记录

RxJava2笔记(四、观察者线程切换)

2019-01-09  本文已影响10人  WonderSky_HY

在上一篇文章RxJava2笔记(三、订阅线程切换)中,我们分析了订阅线程是如何切换的,即调用subscribeOn()来切换订阅线程时都执行了哪些操作。在本文我们将继续介绍观察者线程切换,也就是将线程由子线程切换回UI线程。

继续在前面的基础上修改代码,在订阅线程切换方法后调用observeOn(AndroidSchedulers.mainThread())将线程切换回主线程:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) {
        Log.i(TAG, "subscribe--运行线程:" + Thread.currentThread().getName());
        emitter.onNext(1);
        emitter.onNext(2);
        try {
            TimeUnit.MILLISECONDS.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        emitter.onNext(3);
        emitter.onComplete();
    }
})
    //将线程由UI线程切换到子线程执行IO请求
    .subscribeOn(Schedulers.io())
    //将线程切换回UI线程,方面后续操作更新UI界面
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

看下运行结果:

I/MainActivity: onSubscribe--运行线程:main
I/MainActivity: subscribe--运行线程:RxCachedThreadScheduler-1
I/MainActivity: onNext: 1 --运行线程:main
I/MainActivity: onNext: 2 --运行线程:main
I/MainActivity: onNext: 3 --运行线程:main
I/MainActivity: onComplete--运行线程:main

可以看到,subscribe方法运行在子线程中(也就是订阅线程运行在名为RxCachedThreadScheduler-1的一个子线程中,上文提到该线程是由RxJava实现的一个工厂类创建的),而observer运行在名为main的线程中,这个main线程就是UI线程。


看完了输出结果,接下来就看看这个observeOn(AndroidSchedulers.mainThread())是如何将线程切换到UI线程的,点进去看下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    //当出现异常时,默认无延迟发送错误。bufferSize()是缓冲区大小,RxJava设置了一个默认大小,为128。
    return observeOn(scheduler, false, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    //包装类,保存上游observable
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

熟悉的套路,跟调用subscribe.on方法时类似,只是多了一个验证缓冲区大小不为空的代码,这些我们都略过,直接看ObservableObserveOn这个类:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //如果传入的调度器是TrampolineScheduler,则不切换线程,在当前线程调度
        //但是调度的任务并不是马上执行,而是等待当前任务执行完毕再执行
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //创建工作者worker
            Scheduler.Worker w = scheduler.createWorker();
            //上游的subscribe,该方法会触发上游的subscribeActual,
            //ObserveOnObserver也是一个包装类,保存下游的observer
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    //......代码省略
}

熟悉的装饰器模式:

这里简要介绍下步骤3:
这里的source是上游的observable对象,source.subscribe()方法实际调用的是上游observable对象的subscribeActual方法,并将下游observer对象的包装类ObserveOnObserver作为参数传递进去,在上游observable对象的subscribeActual方法内,调用ObserveOnObserver包装类中的onSubscribe,onNext等方法,进而调用下游observer的onSubscribe,onNext等方法。

接下来看下ObserveOnObserver这个下游observer包装类:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {
    private static final long serialVersionUID = 6576896619930983584L;
    //下游observer
    final Observer<? super T> actual;
    //调度工作者
    final Scheduler.Worker worker;
    //当订阅任务执行出错时,是否延迟发送错误消息,默认为false,也就是不延迟
    final boolean delayError;
    //缓冲区大小,缓存上游发送的事件
    final int bufferSize;
    //存储上游observable发出的数据队列
    SimpleQueue<T> queue;
    //存储管理下游observer的订阅状态disposable
    Disposable s;
    //订阅任务执行出错时,存储错误信息
    Throwable error;
    //订阅任务是否终止
    volatile boolean done;
    //订阅任务是否被取消
    volatile boolean cancelled;
    //任务执行模式----同步还是异步
    int sourceMode;
    //是否输出融合(通常情况下该选项为false)
    boolean outputFused;

    ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
        this.actual = actual;
        this.worker = worker;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    public void onSubscribe(Disposable s) {
        //当前的disposable为null,上游subscribe产生的disposable不为null,则验证通过
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            //如果订阅时获取的disposable对象s是QueueDisposable类型的
            if (s instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                //新建QueueDisposable队列并将订阅时获取的disposable对象s强转为QueueDisposable,然后赋值给queue
                QueueDisposable<T> qd = (QueueDisposable<T>) s;
                //获取任务运行模式
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                //判断运行模式,并且调用下游observer的onSubscribe方法将当前observer在订阅时产生的disposable传递给下游observer
                if (m == QueueDisposable.SYNC) {
                    sourceMode = m;
                    queue = qd;
                    //为true 使得接下来的onXXX等方法均不会执行
                    done = true;
                    actual.onSubscribe(this);
                    //worker直接调度任务
                    schedule();
                    return;
                }
                if (m == QueueDisposable.ASYNC) {
                    sourceMode = m;
                    queue = qd;
                    actual.onSubscribe(this);
                    //在异步模式下,等待onXXX方法中的worker调度
                    return;
                }
            }
            //否则创建一个支持单一生产者单一消费者的队列
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
            //调用下游observer的onSubscribe方法将当前observer在订阅时产生的disposable传递给下游observer
            actual.onSubscribe(this);
        }
    }

    //......代码省略
}

onSubscribe方法调用后,就开始执行onXXX方法了,首先是onNext方法,这个方法可以反复调用:

@Override
public void onNext(T t) {
    //订阅模式是同步模式或者执行过onComplete/onError方法时,此时done为true,直接返回
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        //将上游数据源发射的数据添加到缓存队列中
        queue.offer(t);
    }
    //开始worker调度任务
    //(这里面调用了handler,将数据发送到主线程所在的消息队列,进而更新UI界面,这里稍后分析)
    schedule();
}

其次是onError和onComplete,这两个方法只能执行一次并且是互斥的:

@Override
public void onError(Throwable t) {
    //如果任务状态已经是终止状态,再执行该任务是就会抛出异常
    if (done) {
        RxJavaPlugins.onError(t);
        return;
    }
    error = t;
    //设置任务状态为终止状态
    done = true;
    //worker任务调度
    schedule();
}

@Override
public void onComplete() {
    //如果任务是终止状态,直接返回
    if (done) {
        return;
    }
    //设置任务状态为终止状态
    done = true;
    //worker任务调度
    schedule();
}

这里onNext,onComplete,onError最后都调用schedule()来调度任务:

//调用worker.schedule(this)开始任务调度
void schedule() {
    //这里通过getAndIncrement() == 0原子性的保证了worker.schedule(this)在调度完之前不会再次被调度
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

上面在执行worker.schedule(this)时传入了this,也就是当前对象ObserveOnObserver,ObserveOnObserver类实现了Runnable接口,因此worker.schedule(this)调度的任务就是自己run()实现方法中的任务:

@Override
public void run() {
    //outputFused通常情况下为false
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

drainFused()通常情况下不会执行,我们只需要关注drainNormal()方法即可,在查看该方法之前,先看下drainNormal()内部调用的一个验证方法checkTerminated(boolean d, boolean empty, Observer<? super T> a),该方法主要是检测任务是否已终止:

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
    //如果订阅被取消,清空数据缓存队列
    if (cancelled) {
        queue.clear();
        return true;
    }
    
    //这个d就是done
    if (d) {
        //done为true时,有两种情况,在onNext调度完毕后执行onComplete或onError
        Throwable e = error;
        if (delayError) {
            //如果是延迟发送错误的情况,必须等到queue(缓存上游observable发出的数据)为空的情况下才能发送错误(有错误的情况下)
            if (empty) {
                if (e != null) {
                    a.onError(e);
                } else {
                    a.onComplete();
                }
                //终止worker任务调度
                worker.dispose();
                return true;
            }
        } else {
            //不延迟发送错误时,直接调用
            if (e != null) {
                //如果任务执行出错,即调用了onNext方法,清空queue
                queue.clear();
                //调用下游observer的onError
                a.onError(e);
                //终止worker调度
                worker.dispose();
                return true;
            } else
            if (empty) {
                //任务正常执行,未出现错误
                //调用下游observer的onComplete,并终止worker调度
                a.onComplete();
                worker.dispose();
                return true;
            }
        }
    }
    //否则返回false,任务还未终止
    return false;
}

任务结束情况分为以下两种

说完了这个方法,我们继续看drainNormal()这个方法:

void drainNormal() {
    //这个变量只是一个控制变量,用来确保drainNormal()方法能被原子性调用
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;
    
    //外层死循环
    for (;;) {
        //根据数据缓存队列是否为空检查任务是否已终止
        //为空就会直接跳出外层循环,方法执行结束,内层的循环也不会执行
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        //内层死循环,负责具体的任务处理
        for (;;) {
            boolean d = done;
            T v;

            try {
                //从队列中获取数据
                v = q.poll();
            } catch (Throwable ex) {
                //任务执行出现错误,抛出致命错误信息,结束循环,终止本次任务
                Exceptions.throwIfFatal(ex);
                //终止订阅
                s.dispose();
                //清空缓存队列
                q.clear();
                a.onError(ex);
                //停止worker调度
                worker.dispose();
                return;
            }
            boolean empty = v == null;
            //判断从队列中取出的数据是否为空,即判断队列是否为空
            if (checkTerminated(d, empty, a)) {
                return;
            }
            //队列中没有了数据,直接退出
            if (empty) {
                break;
            }
            //调用下游observer的onNext(onComplete和onError均在checkTerminated方法里调用)
            a.onNext(v);
        }
        //这里主要是确保在同一时间只有一个worker.schedule(this)正在执行。
        //missed变量在方法最开始初始化为1,这里missed会被重置为0,这样下面的missed==0成立,当前任务结束。
        //addAndGet(-missed)方法也会将AtomicInteger内部的VALUE值设置为0。
        //同时,run()方法中的判定方法getAndIncrement() == 0成立,继续执行下一个worker.schedule(this)。
        //如果程序没有走到这里,那么missed==0也就不成立,相应的run()方法中的getAndIncrement() == 0不成立,也就不会执行下一个worker.schedule(this)。
        //这样就原子性的确保了同一时间只有一个worker.schedule(this)正在执行,即同一时间只有一个drainNormal()方法在执行。
        missed = addAndGet(-missed);
        //程序走到这里,表示当前任务正常结束,退出循环。
        //run()方法继续执行,getAndIncrement() == 0成立,开始执行下一个worker.schedule(this)。
        if (missed == 0) {
            break;
        }
    }
}

到这里,ObserveOnObserver这个下游observer包装类也就介绍的差不多了,简要总结下:


在上一篇文章中,订阅线程最终是通过调度器来执行具体切换过程的。同样的,对于观察者线程切换执行的也是类似的过程。前面分析道,ObservableObserveOn构造方法接收我们传入的调度器scheduler,并通过scheduler创建工作者worker,将其传入到ObserveOnObserver的构造方法中,最后在run()方法中执行具体的任务调度。因此观察者的线程切换肯定是发生在worker的调度过程中。

先从observeOn(AndroidSchedulers.mainThread())的参数AndroidSchedulers.mainThread()开始,点进去看下:

public final class AndroidSchedulers {

    private static final class MainHolder {
        //构造一个与UI线程关联的Handler,并将其作为参数构造HandlerScheduler调度器
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    //返回与主线程相关的scheduler调度器
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    //自己指定线程Looper自定义观察者线程切换
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

AndroidSchedulers.mainThread()最终返回HandlerScheduler对象,HandlerScheduler也是继承自Scheduler,其构造方法接收一个Handler类型的参数,这个Handler通过Looper.getMainLooper()与UI线程关联起来,这样通过Handler发送的消息就能被UI线程接收,从而更新UI界面。具体返回HandlerScheduler对象的步骤与上文所介绍的订阅线程切换生成IoScheduler一致,不再详述。

熟悉Handler的童鞋都知道,Handler在安卓的消息机制中占有重要的地位,它贯穿着整个安卓的体系,在很多地方我们都能见到它的身影。在刚开始接触安卓开发的时候我们都写过类似下面的代码:

private Handler handler = new Handler(){
    @Override
    public void handleMessage(Message msg) {
        super.handleMessage(msg);
        //更新UI线程代码
        //获取子线程发送的message消息里面的请求数据,更新UI界面
        //操作省略
    }
};

public void getData(){
    new Thread(new Runnable() {
        @Override
        public void run() {
            //1、网络请求数据并返回
            //2、获取Message对象,并将请求到的数据用Message包裹起来
            //3、调用handler的sendMessage等方法发送Message
        }
    }).start();
}

随着现在各种各样网络请求框架的出现,大大简化了我们网络请求更新UI的操作,上面的代码我们很少再去写了,但并不意味着它就不重要了,尤其是Handler。其实很多的网络请求框架都只不过是将上面的操作封装起来了而已,RxJava虽然与网络请求无关,但在观察者线程切换里面同样也是将上面的过程封装起来,方便我们使用。

我们接着来看HandlerScheduler这个类:

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    //......代码省略

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
    //......代码省略
}

HandlerScheduler继承自Scheduler,并实现了createWorker()方法生成任务调度worker,这里返回的是一个HandlerWorker对象,前面提到的worker.schedule(this)中的worker实际上就是这个HandlerWorker。

private static final class HandlerWorker extends Worker {
    //保存外部传进来的Handler,这里保存的是与UI线程关联的Handler,具体参见上面介绍
    private final Handler handler;
    //事件订阅状态标志位
    private volatile boolean disposed;

    HandlerWorker(Handler handler) {
        this.handler = handler;
    }

    //worker.schedule(this)最终调用的方法,这里delay为0,表示handler无延迟发送消息
    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");
        //如果订阅已终止,返回带有终止状态的disposable
        if (disposed) {
            return Disposables.disposed();
        }
        //对任务做一些自己的处理(默认情况下没做任何处理)
        run = RxJavaPlugins.onSchedule(run);
        //包装类
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        //获取message,设置其target成员为handler,callback成员为scheduled
        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.
        //将消息发送到UI关联的消息队列中,此时handler中的queue是UI线程中的queue
        //(参考new Handler(Looper.getMainLooper()))
        handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

        // Re-check disposed state for removing in case we were racing a call to dispose().
        //再次检查订阅是否被终止,若已被终止,移除handler中的callback,并返回带有终止状态的disposable
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }

        return scheduled;
    }
    
    //订阅终止
    @Override
    public void dispose() {
        disposed = true;
        handler.removeCallbacksAndMessages(this /* token */);
    }

    @Override
    public boolean isDisposed() {
        return disposed;
    }
}

在HandlerWorker中,将任务run用ScheduledRunnable包装起来,在设置到Message的callback中,生成一个Message,熟悉handler的都知道,handler通过dispatchMessage(Message msg)方法来进行消息的分发处理,这个方法的处理顺序如下:

生成Message后,调用handler的sendMessageDelayed方法发送消息(这里delay参数为0,因此是立即发送),由于这里的handler是与UI线程关联在一起的,因此ObserveOnObserver(实现了Runnable接口)内部的run()方法就被发送到了UI线程中的消息队列中,最终通过handler的dispatchMessage(Message msg)方法调用handleCallback(msg),最后调用message.callback.run()执行run()方法里面的代码,完成UI线程更新。

在文章的最后,我们来看下这个run任务的包装类ScheduledRunnable:

private static final class ScheduledRunnable implements Runnable, Disposable {
    private final Handler handler;
    private final Runnable delegate;

    private volatile boolean disposed;

    //保存外部传入的handler和runnable任务
    ScheduledRunnable(Handler handler, Runnable delegate) {
        this.handler = handler;
        this.delegate = delegate;
    }

    @Override
    public void run() {
        try {
            //执行run方法,即传入的ObserveOnObserver(实现了Runnable接口)内部的run()方法
            delegate.run();
        } catch (Throwable t) {
            IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
            RxJavaPlugins.onError(ie);
            Thread thread = Thread.currentThread();
            thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
        }
    }

    //订阅终止,从handler中移除该runnable
    //(实际上是从UI线程内部的队列中将包装这个runnable的message移除,如果这个message还未处理的话)
    @Override
    public void dispose() {
        disposed = true;
        handler.removeCallbacks(this);
    }

    @Override
    public boolean isDisposed() {
        return disposed;
    }
}

至此,整个观察者线程切换也就介绍完了,最后我们再来梳理下思路:

下一章RxJava2笔记(五、订阅流程梳理以及线程切换次数有效性)将对前面做一个流程梳理,以此来结束RxJava的学习。

上一篇 下一篇

猜你喜欢

热点阅读