Android开发Android技术知识Android开发经验谈

RxJava 2.x 源码分析(三)之一步一步了解切换线程原理

2018-06-05  本文已影响26人  zYoung_Tang

前文

Android 开发离不开异步操作,比如网络请求必须在子线程中进行,获取数据后可能需要更改UI,但是UI只能在主线程中被修改,所以我们经常在子线程和主线程中切换,代码就会比较繁琐不方便阅读。RxJava 的诞生是为了更好的处理异步操作,前面两篇文章我们了解到 RxJava 使用了观察者模式,现在继续了解 RxJava 利用观察者模式同时实现了线程切换。

线程调度

RxJava 中线程调度的核心方法是subscribeOn()observeOn()

在查看源码之前可以先从下面几个例子中看效果

下面的例子都使用的是相同的被观察者观察者:

创建被观察者和观察者

// 创建被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        Log.d("RXJAVA_LOG", "发射事件1 -- " + Thread.currentThread().getName());
        e.onNext("item 1");
        Log.d("RXJAVA_LOG", "发射事件2 -- " + Thread.currentThread().getName());
        e.onNext("item 2");
        Log.d("RXJAVA_LOG", "发射事件3 -- " + Thread.currentThread().getName());
        e.onNext("item 3");
        Log.d("RXJAVA_LOG", "发射事件4 -- " + Thread.currentThread().getName());
        e.onNext("item 4");
    }
});

// 创建观察者
Observer<String> observer = new Observer<String>() {

    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(String s) {
        Log.d("RXJAVA_LOG", "接受事件 : " + s + " -- " + Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() {
    }
};

例子1:不切线程

observable.subscribe(observer);

输出:

发射 item1     -- main
接受 item1 -- main
发射 item2     -- main
接受 item2 -- main
发射 item3     -- main
接受 item3 -- main
发射 item4     -- main
接受 item4 -- main

例子2:切换线程

 observable
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

输出:

发射 item1     -- RxCachedThreadScheduler-1
发射 item2     -- RxCachedThreadScheduler-1
发射 item3     -- RxCachedThreadScheduler-1
发射 item4     -- RxCachedThreadScheduler-1
接受 item1 -- main
接受 item2 -- main
接受 item3 -- main
接受 item4 -- main

例子3:使用FlatMap并切换线程

observable
    .flatMap(new Function<String, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(String s) throws Exception {
            final String str = s;
            return Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    Log.d("RXJAVA_LOG", "发射 " + str + "-sub -- " + Thread.currentThread().getName());
                    e.onNext(str + "-sub");
                }
            });
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

输出:

发射 item1     -- RxCachedThreadScheduler-1
发射 item1-sub -- RxCachedThreadScheduler-1
发射 item2     -- RxCachedThreadScheduler-1
发射 item2-sub -- RxCachedThreadScheduler-1
发射 item3     -- RxCachedThreadScheduler-1
发射 item3-sub -- RxCachedThreadScheduler-1
接受 item1-sub -- main
发射 item4     -- RxCachedThreadScheduler-1
发射 item4-sub -- RxCachedThreadScheduler-1
接受 item2-sub -- main
接受 item3-sub -- main
接受 item4-sub -- main

从上面的例子可以看出:

那么我们就可以把网络请求操作作为被观察者的事件,并且把发送事件线程切换到IO线程,然后把更新UI操作放到观察者接收事件的回调中,并把接收事件所在线程切换回主线程,这样就可以利用 RxJava 完成我们网络请求的异步操作。


以例子2为阅读源码的例子,根据subscribeOn()、observeOn()、subscribe()的调用顺序分为三步查看

1. subscribeOn()

下面我们开始阅读源码,先从subscribeOn(Schedulers.io())开始看,首先看看io()方法

// Schedulers.java
/**
 * Returns a default, shared {@link Scheduler} instance intended for IO-bound work.
    ...
 */
@NonNull
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

这里出现了一个变量IO,看看是什么东西

// Schedulers.java
public final class Schedulers {
    @NonNull
    static final Scheduler IO;
    
    static {    
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
        ...
    }
    ...
}

static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}

原来变量IO是一个Scheduler对象且是Schedulers的静态成员变量且在类加载的时候已经初始化了,PS:Schedulers可以看成是一个用来创建Scheduler的工厂,提供多个静态方法如io()newThread()computation等来获取适用于不同场景的Scheduler

接着看初始化过程,首先创建了一个IOTask实例,然后传给RxJavaPlugins的初始化方法initIoScheduler()

// RxJavaPlugins.java
@NonNull
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
    // 检查 IOTask 是否为空
    ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
    Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
    if (f == null) { // onInitIoHandler 没有被设置,所以为 null
        return callRequireNonNull(defaultScheduler);
    }
    return applyRequireNonNull(f, defaultScheduler);
}

@NonNull
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
    try {
        // s : IOTask
        return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
    } catch (Throwable ex) {
        throw ExceptionHelper.wrapOrThrow(ex);
    }
}

首先检查IOTask对象是否为空,然后调用callRequireNonNull方法调用IOTask.call(),并返回call()的返回值,即返回IoHolder.DEFAULT,IoHolder又是什么呢,原来是Schedulers的内部类,变量IO的本质原来是IoScheduler对象

// Schedulers.java
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

变量IO的身份我们已经知道了,回到一开始的io()看看RxJavaPlugins.onIoScheduler(IO)做了什么

// RxJavaPlugins.java
@NonNull
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
    Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
    if (f == null) { // f = null
        return defaultScheduler;
    }
    return apply(f, defaultScheduler);
}

由于onIoHandler是静态变量且没有初始化,所以为null,直接返回我们的变量IO

所以io()方法最后返回的是IoScheduler对象。

接着看subscribeOn()

// Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
    // 检查是否为 null
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

返回的是ObservableSubscribeOn对象,且保存了上游的被观察者Scheduler对象

// ObservableSubscribeOn.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source); // source: 上游的被观察者
        this.scheduler = scheduler; // 本例子是 IoScheduler
    }
    ...
}

到此,subscribeOn()的方法已经看完了,最终返回一个ObservableSubscribeOn对象。


2. observeOn()

首先看看AndroidSchedulers.mainThread()怎么获取主线程操作的 scheduler

// AndroidSchedulers.java
public final class AndroidSchedulers {
    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
        
    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
        new Callable<Scheduler>() {
            @Override public Scheduler call() throws Exception {
                return MainHolder.DEFAULT;
            }
    });
    ...
}

这里出现了一个静态变量MAIN_THREAD,且由RxAndroidPlugins初始化,代码如下

// RxAndroidPlugins.java
public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
    if (scheduler == null) { // false
        throw new NullPointerException("scheduler == null");
    }
    Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
    if (f == null) { // onInitMainThreadHandler 为 null
        return callRequireNonNull(scheduler);
    }
    return applyRequireNonNull(f, scheduler);
}

static Scheduler callRequireNonNull(Callable<Scheduler> s) {
    try {
        // s : 传入 initMainThreadScheduler 的匿名对象
        Scheduler scheduler = s.call();
        if (scheduler == null) {
            throw new NullPointerException("Scheduler Callable returned null");
        }
        return scheduler;
    } catch (Throwable ex) {
        throw Exceptions.propagate(ex);
    }
}
// AndroidSchedulers.java
private static final class MainHolder {
    // 通过 Looper.getMainLooper() 获取主线程的 Looper
    // 创建在主线程 Looper 上的 Handler
    // 用这个 Handler 创建 HandlerScheduler 对象
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}

DEFAULT其实就是一个拥有主线程Handler的HandlerScheduler对象,所以AndroidSchedulers.mainThread()最终返回的是HandlerScheduler对象。

下面回归到observeOn()

// Observable.java
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    // this : ObservableSubscribeOn
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

从上面代码可以看见observeOn最终返回的是ObservableObserveOn对象,且在我们的例子中这个对象拥有的 Scheduler 是HandlerScheduler


3. subscribe()

到这里我们知道本例子中被观察者ObservableObserveOn

这个方法从之前两篇文章中知道它是一个启动方法,一旦调用,被观察者就开始发送事件给观察者,而且实际执行的方法是被观察者subscribeActual(),还记得在我们的例子中最外层的被观察者ObservableObserveOn所以我们从它的subscribeActual()中开始看起

// ObservableObserveOn.java
@Override
    protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) { // false
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        // source : ObservableSubscribeOn
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

在第二步我们知道ObservableObserveOn中的 scheduler 是HandlerScheduler,所以走 else ,首先执行了Scheduler.Worker w = scheduler.createWorker();,创建了HandlerScheduler.HandlerWorker对象

// HandlerScheduler.java
@Override
public Worker createWorker() {
    return new HandlerWorker(handler);
}

private static final class HandlerWorker extends Worker {
    private final Handler handler;
    
    private volatile boolean disposed;
    
    HandlerWorker(Handler handler) {
        this.handler = handler;
    }
    ...
}

在这里先不深入了解HandlerWorker,但是我们可以猜到worker是切换线程的关键。
接着看

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

这里的source是什么?还记得第一步调用subscribeOn()返回的ObservableSubscribeOn对象吗,第二步相当于调用了ObservableSubscribeOn.observeOn()到最后创建ObservableObserveOn对象的时候ObservableSubscribeOn赋值给了source

由于ObservableSubscribeOn没有覆写subscribe()方法,所以source.subscribe()调用的是Observable.subscribe()

// Observable.java
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        // observer : ObserveOnObserver
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ...
    }
}

最终会调用ObservableSubscribeOnsubscribeActual()

// ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> s) {
    // s : ObserveOnObserver
    // 创建 SubscribeOnObserver 包装了 s
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    // 调用我们的回调方法 onSubscribe
    s.onSubscribe(parent);
    // scheduler : IoScheduler
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

    private static final long serialVersionUID = 8094547886072529208L;
    final Observer<? super T> actual;

    final AtomicReference<Disposable> s;

    SubscribeOnObserver(Observer<? super T> actual) {
        this.actual = actual;
        this.s = new AtomicReference<Disposable>();
    }

    @Override
    public void onSubscribe(Disposable s) {
        DisposableHelper.setOnce(this.s, s);
    }
    
    void setDisposable(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }
        
    // 省略以下方法
    // onNext onError onComplete dispose isDisposed
    ...
}

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        // source : 我们创建的被观察者
        // parent : SubscribeOnObserver
        source.subscribe(parent);
    }
}

subscribeActual()只有三行代码,首先创建SubscribeOnObserver包装我们创建的观察者,然后调用我们的观察者回调方法onSubscribe(),而最后一行代码

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

是最重要的,这行代码的执行引起了一连串方法调用,激活了被观察者开始发射事件以及后面的一连串的操作,之后我们会在多个类里不停的跳转,见证着一系列动作。

这行代码分三步,但只需要关心前两步,第三步parent.setDisposable是设置Disposable,提供可断开被观察者观察者连接的操作:

1. new SubscribeTask()

创建SubscribeTask,SubscribeTask 继承了 Runnable ,重点看下 run 方法里面是

source.subscribe(parent);

根据前面两篇文章我们知道这行代码会调用Observable.subscribeActual(observer),然后调用ObservableCreate. subscribeActual()

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

最后调用我们创建被观察者时覆写的 subscribe 方法,从而激活了被观察者发射事件

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        ...
});

所以我们可以预知随着代码的深入,SubscribeTask.run()将会在子线程中被调用,从而实现了被观察者在子线程中发射事件。

2. IoScheduler.scheduleDirect()

实际上调用的是IoScheduler父类SchedulerscheduleDirect()

// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    // w : EventLoopWorker
    final Worker w = createWorker();
    // run : SubscribeTask
    // decoratedRun : SubscribeTask
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        
    DisposeTask task = new DisposeTask(decoratedRun, w);
        
    w.schedule(task, delay, unit);
        
    return task;
}
// IoScheduler.java
@NonNull
@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}
w.schedule(task, delay, unit);

时执行的是 EventLoopWorker.schedule(),看看EventLoopWorker

// IoScheduler.java
static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        this.threadWorker = pool.get();
    }

    @Override
    public void dispose() {
        if (once.compareAndSet(false, true)) {
            tasks.dispose();

            // releasing the pool should be the last action
            pool.release(threadWorker);
        }
    }

    @Override
    public boolean isDisposed() {
        return once.get();
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) { // false
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }

        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
}

static final class ThreadWorker extends NewThreadWorker {
    private long expirationTime;

    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }

    public long getExpirationTime() {
        return expirationTime;
    }

    public void setExpirationTime(long expirationTime) {
        this.expirationTime = expirationTime;
    }
}

注意到构造方法里成员threadWorker是从CachedWorkerPool缓存池中获取的,EventLoopWorker.schedule()最终会调用NewThreadWorker.scheduleActual

// NewThreadWorker.java
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {  
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {  // delayTime = 0
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

scheduleActual()内不做深入了解,只需要知道会执行下面这行代码

f = executor.submit((Callable<Object>)sr);

executor.submit()内部也不做深入介绍,只需要知道最终会在子线程调用SubscribeTask.run()证实了上文的猜测即可。

到了这里已经到达了我们的被观察者中发射事件的操作了,假设从网络请求中返回了数据可以通过onNext方法发射给观察者,那么接下来就看看在子线程中发射事件之后观察者是如何在主线程中接收事件并作出处理操作的。

// ObservableCreate.java
@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

到这里我们会有疑问这个观察者是哪个?刚刚证实了被观察者发射事件的过程是放在SubscribeTask,现在回顾一下创建SubscribeTask时的代码

// ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> s) {
    // s : ObserveOnObserver
    // 创建 SubscribeOnObserver 包装了s
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    // 调用我们的回调方法 onSubscribe
    s.onSubscribe(parent);
    // scheduler : IoScheduler
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

看完之后我们明白了原来顶层的观察者SubscribeOnObserver,先我们看看它的构造方法,用变量actual保存了ObserveOnObserver对象

// ObservableSubscribeOn.SubscribeOnObserver
SubscribeOnObserver(Observer<? super T> actual) {
    this.actual = actual;
    this.s = new AtomicReference<Disposable>();
}

看看SubscribeOnObserveronNext()

@Override
public void onNext(T t) {
    actual.onNext(t);
}

里面调用了ObserveOnObserver.onNext()

// ObservableObserveOn
@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        // worker : HandlerWorker
        // this : ObserveOnObserver
        worker.schedule(this);
    }
}

ObserveOnObserver内有一个队列 queue 用来保存收到的数据,然后调用schedule(),根据之前所说的观察者的worker是HandlerWorker,前面贴的代码并没有放schedule()这个方法,下面重新贴出来:

先调用父类Scheduler.workerschedule()

// Scheduler.worker
@NonNull
public Disposable schedule(@NonNull Runnable run) {
    return schedule(run, 0L, TimeUnit.NANOSECONDS);
}

再调用HandlerWorkerschedule()

// HandlerScheduler.HandlerWorker
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");
    
    if (disposed) {
        return Disposables.disposed();
    }
    
    run = RxJavaPlugins.onSchedule(run);
    
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }
    
    return scheduled;
}

其主要操作就是把ObserveOnObserver包装成ScheduledRunnable放进Message,然后通过HandlerWorker绑定的Handler(即主线程Handler)发送到消息队列等待处理。

这里有个疑问就是schedule()接收的是 Runnable 类型的参数,为什么我们可以把ObserveOnObserver传进去呢?之前没有贴完整的ObserveOnObserver代码,现在看看到底是什么东西

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
    ...
    @Override
    public void run() {
        if (outputFused) { // false
            drainFused();
        } else {
            drainNormal();
        }
    }
        
    void drainNormal() {
        int missed = 1;
    
        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;
    
        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }
    
            for (;;) {
                boolean d = done;
                T v;
    
                try {
                    v = q.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;
    
                if (checkTerminated(d, empty, a)) {
                    return;
                }
    
                if (empty) {
                    break;
                }
    
                a.onNext(v);
            }
    
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
    ...
}

原来ObserveOnObserver也实现了Runnable,且run()里会调用drainNormal,就是把上面用来保存事件的队列 queue 都发送到观察者


到这里我们已经把例子2切换线程的过程看了一遍,之前只是知道简单的调用两个方法就能随意切换线程,现在终于通过例子了解到里面的原理了。

上一篇下一篇

猜你喜欢

热点阅读