Rxjava2.1 线程切换原理解析

2019-08-28  本文已影响0人  innovatorCL

一、前提说明

本文是在 Rxjava 2.1 的基础上进行的,目前只对 Rxjava 进行解析,未搭配 Retrofit 食用,如果想看 Rxjava + Retrofit 源码解析,请移步 Retrofit 2.1 + Rxjava 源码解析(一)

二、Rxjava 使用栗子

new Thread("子线程"){
          @Override
          public void run() {
              Observable.create(new ObservableOnSubscribe<String>() {
                          @Override
                          public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                              Log.e(TAG, "Observable#subscribe(): 所在线程为 " + Thread.currentThread().getName());
                              emitter.onNext("1");
                              emitter.onComplete();
                          }
                      })
//              .subscribeOn(Schedulers.io())
                      .observeOn(Schedulers.io())
                      .subscribe(new Observer<String>() {
                          @Override
                          public void onSubscribe(Disposable d) {
                              Log.e(TAG, "observer#onSubscribe(): 所在线程为 " + Thread.currentThread().getName());
                          }

                          @Override
                          public void onNext(String s) {
                              Log.e(TAG, "observer#onNext(): 所在线程为 " + Thread.currentThread().getName());
                          }

                          @Override
                          public void onError(Throwable e) {
                          }

                          @Override
                          public void onComplete() {
                              Log.e(TAG, "observer#onComplete(): 所在线程为 " + Thread.currentThread().getName());
                          }
                      });
          }
      }.start();

输出结果:

E/Rxjava: observer#onSubscribe(): 所在线程为 子线程
E/Rxjava: Observable#subscribe(): 所在线程为 子线程
E/Rxjava: observer#onNext(): 所在线程为 RxCachedThreadScheduler-1
E/Rxjava: observer#onComplete(): 所在线程为 RxCachedThreadScheduler-1

Rxjava2.1订阅流程解析 中我们已经分析了 Observable.create() 的过程,就是构建一个 ObservableCreate 对象,ObservableCreate 是 Observable 的子类。

由上文可以知道,当调用了 subscribe() 后,会执行以下顺序:Observable.subscribe(Observer) -> ObservableCreate.subscribeActual(Observer) -> Observer#onSubscribe(),所以可以知道 Observer#onSubscribe() 的执行线程是当前线程,即调用了 subscribe()的线程。

三、Observable.observeOn(Schedulers.io())

从上面栗子可以看到,如果我们只是调用了 observeOn(Schedulers.io()),这样影响的是 observer 的 onNext() 和 onComplete(),对于 ObservableOnSubscribe#subscribe() 和 Observer#onSubscribe() 是没有影响的。

我们看看 Observable.observeOn(Schedulers.io()) 的源码:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //删除无关紧要的代码
        //这里的 this 是 ObservableCreate 对象
        return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
    }

可以看到就是将传入的 ObservableCreate 对象封装进了 ObservableObserveOn 对象中,可以肯定的是 ObservableObserveOn 也是 Observable 的子类。

我们从上文得知,接下来会调用 observable.subscribe(observer) 的时候会跳转调用 Observable 子类的 ObservableObserveOn.subscribeActual(observer) 方法。这其实是用了静态工厂模式。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {

    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
         // 如果传入的 scheduler 是 Scheduler.trampoline() 的情况
        // 该线程的意义是传入当前线程,也就是不做任何线程切换操作
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            //这里的 source 是 ObservableCreate 对象
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

当需要切换线程的时候,可以看到将传进来的 ObservableCreate 对象进行了订阅,只不过观察者又被封装成了 ObserveOnObserver 对象。这样就会执行 ObservableCreate#subscribeActual()

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //这里的 observer 就是 ObserveOnObserver 对象
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //这里的 observer 就是 ObserveOnObserver 对象
        observer.onSubscribe(parent);

        try {
            //这里的额 source 就是我们在最外层创建的 ObservableOnSubscribe 对象
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
}

这里可以看到使用了 observeOn(Schedulers.io()) 方法,但是 Observer#onSubscribe(Disposable d) 并没有切换线程,仍在当前线程中运行。也就是 ObserveOnObserver.onSubscribe() 是运行在当前线程的。我们看看这个方法做了什么:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        //真正的观察者,最外层我们创建的 observer
        final Observer<? super T> actual;
        final Scheduler.Worker worker;

        Disposable s;

        ......
        
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
        }

        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        //执行真正的被观察者 Observer(最外层我们创建的 observer)#onSubscribe()
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        //执行真正的被观察者 Observer(最外层我们创建的 observer)#onSubscribe()
                        actual.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //执行真正的被观察者 Observer(最外层我们创建的 observer)#onSubscribe()
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }

        @Override
        public void dispose() {
            if (!cancelled) {
                cancelled = true;
                s.dispose();
                worker.dispose();
                if (getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }
    
    void schedule() {
            if (getAndIncrement() == 0) {
                //this 就是 ObserveOnObserver 对象
                worker.schedule(this);
            }
        }
}

可以看到 ObserveOnObserver#onSubscribe(Disposable s) 中一定会调用 actual.onSubscribe(this);,其中这个 this 就是 ObserveOnObserver 对象,也就是让我们最外层的 observer 订阅了 ObserveOnObserver。

可以看到在 RxJava 中运用的操作符都会在内部创建一个 Observable 和 Observer,所以外界使用起来和简单,但是里面运行的原理倒是挺复杂的,容易让人混淆。

运行完 ObserveOnObserver#onSubscribe(Disposable s) 后,就轮到了 source.subscribe(parent);(这里的额 source 就是我们在最外层创建的 ObservableOnSubscribe 对象),也就是说我们的 ObservableOnSubscribe#subscribe(emitter) 运行在当前线程。到这里的分析都很符合我们打印的结果。

而我们在最外层,只是让发射器 emitter 简单地发送了一个 Next 事件。这个事件会被谁接收呢?

static final class CreateEmitter<T> extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    
        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            //这里的 observer 就是 ObserveOnObserver 对象
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            //这里的 observer 就是 ObserveOnObserver 对象
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        
        ......
}

我们从之前调用到 ObservableCreate#subscribeActual() 可以知道,当时传进来的 parent 是 ObserveOnObserver 对象。所以发射器 emitter 发射的事件会被 ObserveOnObserver 接收。

可以看到 ObserveOnObserver.onNext() 中最后执行了 schedule(),也就是在这里进行了线程切换的操作。

由于我们传入的 Scheduler 是 IO 线程,我们看看这个 IO Schedule 的 worker.schedule(this)

一路追踪,终于找到了这个 IOScheduler 的庐山真面目:

public final class IoScheduler extends Scheduler {

private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
    static final RxThreadFactory WORKER_THREAD_FACTORY;

    private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor";
    static final RxThreadFactory EVICTOR_THREAD_FACTORY;

    private static final long KEEP_ALIVE_TIME = 60;
    private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;

    static final ThreadWorker SHUTDOWN_THREAD_WORKER;
    final ThreadFactory threadFactory;
    final AtomicReference<CachedWorkerPool> pool;
    
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    
    ......
    
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            //放到线程池中执行
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

    ......
}

至此,你可以看到调用了 observerOn() 方法的全过程,只是会改变观察者 observer 的 onNext()、onComplete() 方法的运行线程,不会改变被观察者 Observable 的运行线程。

四、observeOn() 切换线程原理小结

看完整个过程,我们知道当我们使用 observeOn(Schedulers.io())的时候,其实 Rxjava 在内部帮我们创建封装了若干个中间对象的 Observable 和 Observer。然后将这个订阅操作放在 Rxjava 的线程池进行,达到切换线程的功能。

被观察者 Observable 的变化过程:Observable ==> ObservableCreate ==> ObserbvableObserveOn。

观察者 Observer 的变化过程:Observer ==> ObserveOnObserver,然后传到 ObservableEmitter<String> emitter 里面,作为发射器的 observer 成员变量。

总之,Observable#observeOn(Scheduler) 的实现原理在于将目标 Observer 的 onNext(T)/onError(Throwable)/onComplete() 置于指定线程中运行。

五、subscribeOn() 栗子

new Thread("子线程"){
          @Override
          public void run() {
              Observable
                      .create(new ObservableOnSubscribe<String>() {
                          @Override
                          public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                              Log.e(TAG, "ObservableOnSubscribe#subscribe(): 所在线程为 " + Thread.currentThread().getName());
                              emitter.onNext("1");
                              emitter.onComplete();
                          }
                      })
                      .subscribeOn(Schedulers.io())
//                      .observeOn(Schedulers.io())
                      .subscribe(new Observer<String>() {
                          @Override
                          public void onSubscribe(Disposable d) {
                              Log.e(TAG, "observer#onSubscribe(): 所在线程为 " + Thread.currentThread().getName());
                          }

                          @Override
                          public void onNext(String s) {
                              Log.e(TAG, "observer#onNext(): 所在线程为 " + Thread.currentThread().getName());
                          }

                          @Override
                          public void onError(Throwable e) {
                          }

                          @Override
                          public void onComplete() {
                              Log.e(TAG, "observer#onComplete(): 所在线程为 " + Thread.currentThread().getName());
                          }
                      });
          }
      }.start();

输出结果:

E/Rxjava: observer#onSubscribe(): 所在线程为 子线程
E/Rxjava: ObservableOnSubscribe#subscribe(): 所在线程为 RxCachedThreadScheduler-2
E/Rxjava: observer#onNext(): 所在线程为 RxCachedThreadScheduler-2
E/Rxjava: observer#onComplete(): 所在线程为 RxCachedThreadScheduler-2

六、ObservableCreate.subscribeOn()

由上文可以,Observable.create() 会生成一个 ObservableCreate 对象。我们看看 ObservableCreate.subscribeOn()

public final Observable<T> subscribeOn(Scheduler scheduler) {
        //过滤无关紧要的代码
        //this 是 ObservableCreate 对象
        return new ObservableSubscribeOn<T>(this, scheduler);
    }

可以看到将 ObservableCreate 对象封装成了 ObservableSubscribeOn 对象,然后就会执行 ObservableSubscribeOn#subscribeActual()

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    //这里的 source 是 ObservableCreate 对象,scheduler 是 IoScheduler
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //s 是最外层的 observer
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        
        //调用 observer#onSubscribe
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    ......
}

可以看到 observer#onSubscribe() 仍在在当前线程中执行,之后的 observer 和 ObservableOnSubscribe 的方法都被线程切换类 IoScheduler 切换到了其他线程。

我们看看 IoScheduler 的 scheduler.scheduleDirect(new SubscribeTask(parent)))

final class SubscribeTask implements Runnable {
        //这个 parent 就是 SubscribeOnObserver
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //source 就是 ObservableCreate 对象
            //parent 就是 SubscribeOnObserver 对象
            source.subscribe(parent);
        }
    }
    
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            //actual 就是最外层的 observer 
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }
        
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

可以看到将最外层的 observer 包装成 SubscribeOnObserver 对象,然后包装成一个 SubscribeTask(可以执行的任务)。当在线程池中被执行的时候,会执行 SubscribeTask#run()

我们再看 IoSchedule#scheduleDirect(subscribeTask)

在 IoSchedule 的父类 Schedule 中找到一个方法:

public abstract class Scheduler {
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);
        //其实就是 EventLoopWorker#schedule()
        w.schedule(task, delay, unit);

        return task;
    }
}

又回到了 IOScheduler 创建的 EventLoopWorker 中:

static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            
            //最终放进线程池中执行任务
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }

        public long getExpirationTime() {
            return expirationTime;
        }

        public void setExpirationTime(long expirationTime) {
            this.expirationTime = expirationTime;
        }
    }
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    
    ......
    
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                //放进线程池中执行
                f = executor.submit((Callable<Object>)sr);
            } else {
                //放进线程池中执行
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
    
    ......
}

最终在线程池中执行了我们的 SubscribeTask#run()。其实从 scheduleActual() 和 subscribeActual() 的命名方式可以看出,Rxjava 很多地方都用到了静态工厂模式,都是父类提供抽象方法,具体的子类根据需要实现不同的逻辑,这个就很灵活了。

我们再看看 SubscribeTask#run() 干了什么:

final class SubscribeTask implements Runnable {
        //这个 parent 就是 SubscribeOnObserver
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //source 就是 ObservableCreate 对象
            //parent 就是 SubscribeOnObserver 对象
            source.subscribe(parent);
        }
    }

**那其实就是 ObservableCreate.subscribe(SubscribeOnObserver),这就又跳到了我们熟悉的 ObservableCreate.subscribeActual() 方法中了。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    //source 是最外层的 ObservableOnSubscribe 对象
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //observer 是 SubscribeOnObserver 对象,里面包含最外层的 observer 对象
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //这里 SubscribeOnObserver 只是简单地 set 了一个引用
        observer.onSubscribe(parent);

        try {
            //其实就是 ObservableOnSubscribe.subscribe(SubscribeOnObserver);
            //此时已经运行在 Rxjava 的线程池中
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    
    ......
}

在这里会先执行 observer (SubscribeOnObserver)的 onSubscribe() 方法,这个方法就 set 了一个引用,可以先忽略。接下来会调用 ObservableOnSubscribe.subscribe(SubscribeOnObserver)

Observable.create(new ObservableOnSubscribe<String>() {
                          @Override
                          public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                              Log.e(TAG, "ObservableOnSubscribe#subscribe(): 所在线程为 " + Thread.currentThread().getName());
                              emitter.onNext("1");
                              emitter.onComplete();
                          }
                      });

我们在最外层只发送了一个 Next 事件,根据 CreateEmitter<T> 类的源码:

public void onNext(T t) {
           
            if (!isDisposed()) {
                //observer 就是 SubscribeOnObserver 
                observer.onNext(t);
            }
        }

SubscribeOnObserver.onNext() 会触发:

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            //最外层的 observer 
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            //调用最外层的 observer#onNext()
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            //调用最外层的 observer#onComplete()
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

所以,我们最外层的 observer 的 onNext() 和 onComplete() 会运行在 Rxjava 的线程池的线程中。

至此,subscribeOn(Schedulers.io()) 的过程分析完毕,subscribeOn(Schedulers.io()) 会改变观察者 observer 的 onNext()、onComplete() 方法的运行线程,也会改变被观察者 Observable 的运行线程。

七、subscribeOn() 切换线程原理小结

看完整个过程,我们知道当我们使用 subscribeOn(Schedulers.io())的时候,其实跟上面的 observeOn(Schedulers.io()) 过程差不多,Rxjava 帮我们创建了若干个中间层的 Observable 和 Observer,然后将这个订阅操作放在 Rxjava 的线程池进行,达到切换线程的功能。

被观察者 Observable 的变化过程:Observable ==> ObservableCreate ==> ObserbvableSubscribeOn。

观察者 Observer 的变化过程:Observer ==> SubscribeOnObserver,然后传到 ObservableEmitter<String> emitter 里面,作为发射器的 observer 成员变量。

总之,Observable#subscribeOn(Scheduler) 的实现原理在于将目标 Observer 的 onNext(T)/onError(Throwable)/onComplete() 和 ObservableOnSubscribe 的 subscribe(T) 置于指定线程中运行。

八、subscribeOn() 和 observeOn(Schedulers.io()) 一起使用

这两个 api 一起使用其实也不会有什么很大的变化,就是 observeOn() 会影响 Observer 的 onNext(T)/onError(Throwable)/onComplete() 运行线程,而 subscribeOn() 会影响 ObservableOnSubscribe 的 subscribe(T) 运行线程。

上一篇下一篇

猜你喜欢

热点阅读