Rx系列RxJavaRxJava系列专题(Android方向)

RxJava 2.0----你不知道的Schedulers

2017-11-12  本文已影响41人  Calllanna

一.引言

RxJava中 使用observeOn(Schedulers s)和subscribeOn(Schedulers s)是任务调度的操作符。subscribeOn(Schedulers s) 指示Observable将任务(数据的发射以及数据的处理)放在指定的调度器上的执行 ,observeOn(Schedulers s)指示一个Observer在一个指定的调度器上调用onNext, onError和onCompleted等方法。即subscribeOn决定任务的发射的线程,observeOn决定任务的接收线程。

二.Schedulers任务调度

先看看下面的例子,先猜猜任务调度执行的线程:

//-------------------------- 1   默认主线程-----------------
tx_console.setText("");
printThread("1 默认主线程 ");

Observable.create(observableOnSubscribe)
        .flatMap(function)
        .subscribe(consumer);

//---------------------------2   指定 Observable 的调度器---------------
printThread("2  指定 Observable 的调度器");
Observable.create(observableOnSubscribe)
        .flatMap(function)
        .subscribeOn(Schedulers.newThread())
        .subscribe(consumer);

//---------------------------3    默认新线程---------------
new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        printThread("  默认新线程");
        Observable.create(observableOnSubscribe).flatMap(function).subscribe(consumer);
    }
}).start();

//---------------------------    新线程 指定  Observable ,Observer的调度器---------------

new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        printThread("    新线程 指定  Observable ,Observer的调度器");
        Observable.create(observableOnSubscribe).flatMap(function)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread()).subscribe(consumer);
    }
}).start();
Observable.timer(4,TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                printThread("    timer 调度器");
            }
        });


结果输出:

结果输出

从上面的输出结果中,我们大概知道了下面几点:

  1. RxJava中不同的调度器可以指定在不同的线程中执行 。
  2. Create创建的Observable默认在当前线程中执行任务流,并在当前线程观察
  3. 没有调用observeOn指定观察者调度器,观察者默认在Observable发射线程里执行
  4. timer创建的Observable会在一个叫Computation的线程中执行任务流
  5. 除了observeOn和subscribeOn ,使用其他创建或者变换操作符也有可能造成线程的切换

三. subscribeOn()原理

subscribeOn()用来指定Observable在哪个线程中执行事件流, 通过源码分析subscribeOn可以知道是Observable怎样实现线程的切换的。

1.subscribeOn方法,创建一个 ObservableSubscribeOn对象

 public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

2.ObservableSubscribeOn实现subscribeActual()方法, 保证当前Observer的Disposable 只设置一次,后来的设置无效,并且在指定的调度器重新订阅

   @Override
public void subscribeActual(final Observer<? super T> s) {
//  parent表示目标观察者
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;
        //静态对象里的常量,保证初始化一次
        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
          //保证当前Observer的Disposable 只设置一次,后来的设置无效
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
       //保证当前Observer的Disposable 只设置一次,后来的设置无效
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
           //指定的调度器重新订阅
            source.subscribe(parent);
        }
    }

3. 调度器的scheduleDirect()指定调度器在自己的线程池(Worker)执行任务

 @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

4.IoScheduler ---IO调度器的createWorker()

  class IoScheduler extends Scheduler {
final AtomicReference<CachedWorkerPool> pool;
static {
    NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
    NONE.shutdown();
}

@NonNull
@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}
}
 static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

四. observeOn()原理

observeOn()指示一个观察者 observer 在指定的调度器上调用onNext, onError和onCompleted等方法。先来看看源码的实现过程:
1.observeOn()方法,创建 ObservableObserveOn对象

   @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

2.ObservableObserveOn实现方法subscribeActual,如果是TrampolineScheduler, TrampolineScheduler调度器表示在当前事件流的线程执行任务 ,否则在指定调度器的线程执行任务

  @Override
  protected void subscribeActual(Observer<? super T> observer) {
      if (scheduler instanceof TrampolineScheduler) {
         // TrampolineScheduler调度器表示在当前事件流的线程执行任务 
          source.subscribe(observer);
      } else {
          Scheduler.Worker w = scheduler.createWorker();
          source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
      }
  }

3.ObserveOnObserver在调度器工作线城池中执行onNext(),onError(),onComple()

 static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
   implements Observer<T>, Runnable {
    //目标观察者
     final Observer<? super T> actual;
    //调度器工作线城池
      final Scheduler.Worker worker;
     //数据缓冲队列
     SimpleQueue<T> queue;

       @Override
       public void onSubscribe(Disposable s) {
           if (DisposableHelper.validate(this.s, s)) {
               this.s = s;
               ......
               queue = new SpscLinkedArrayQueue<T>(bufferSize);
               actual.onSubscribe(this);
           }
       }

       @Override
       public void onNext(T t) {
           if (done) {
               return;
           }

           if (sourceMode != QueueDisposable.ASYNC) {
               queue.offer(t);//数据缓存入队列
           }
           schedule();
       }

       @Override
       public void onError(Throwable t) {
           if (done) {
               RxJavaPlugins.onError(t);
               return;
           }
           error = t;
           done = true;
           schedule();
       }

       @Override
       public void onComplete() {
           if (done) {
               return;
           }
           done = true;
           schedule();
       }

       void schedule() {
           if (getAndIncrement() == 0) {
               worker.schedule(this);//调用当前Run()方法
           }
       }
      //
       void drainNormal() {
           int missed = 1;

           final SimpleQueue<T> q = queue;
           final Observer<? super T> a = actual;

           for (;;) {
               if (checkTerminated(done, q.isEmpty(), a)) {
                   return;
               }

               for (;;) {
                   boolean d = done;
                   T v;
                   try {
                       v = q.poll(); //冲缓冲队列取数据
                   } catch (Throwable ex) {
                       Exceptions.throwIfFatal(ex);
                       s.dispose();
                       q.clear();
                       a.onError(ex);
                       worker.dispose();
                       return;
                   }
                   boolean empty = v == null;

                   if (checkTerminated(d, empty, a)) {
                       return;
                   }

                   if (empty) {
                       break;
                   }

                   a.onNext(v);//数据接收
               }
........
           }
       }

       void drainFused() {
              .......
               actual.onNext(null);
              ......
       }
       //
       @Override
       public void run() {
          //是否需要丢弃
           if (outputFused) {
               drainFused();
           } else {
               drainNormal();
           }
       }
}

五.实例演示

根据源码的分析,我们来看看下面的例子,就不难理解了。

 //---------------------------1   同时指定多个 Observable 的调度器---------------
        Observable.create(observableOnSubscribe)
                .flatMap(function)
                .subscribeOn(Schedulers.newThread())//Observable new 线程发射数据
                .subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.computation())
                .subscribe(consumer);//最终observer在new 线程接收数据
        
        //---------------------------2   同时指定多个 Observable 的调度器---------------
        Observable.timer(2,TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        printThread("2   同时指定多个 Observable ,observer 的调度器");
                        Observable.create(observableOnSubscribe).flatMap(function)
                                .subscribeOn(Schedulers.newThread())
                                .subscribeOn(Schedulers.io())
                                .subscribeOn(Schedulers.computation())//最终 Observable在 new 线程发射数据
                                .observeOn(Schedulers.newThread())
                                .observeOn(Schedulers.io())
                                .observeOn(AndroidSchedulers.mainThread())
                                .subscribe(consumer);//最终observer 在 main 线程接收数据
                    }
                });
 //--------------------------- 3   切换 Observable ,Observer的调度器---------------
       create(observableOnSubscribe)
                .flatMap(function)//new 1 线程接收数据
                .subscribeOn(Schedulers.newThread())//new 1线程发送数据
                .flatMap(function)//new 1 线程接收数据
                .subscribeOn(Schedulers.io())//设置无效
                .observeOn(Schedulers.newThread())//切换new 2线程接收数据
                .flatMap(function)//new 2 线程接收数据
                .subscribeOn(Schedulers.computation())//设置无效
                .flatMap(function)//new 2 线程接收数据
                .observeOn(Schedulers.io())//切换io线程接收数据
                .subscribe(consumer);//目标Observer 在io线程中接受数据 
输出结果
//---------------------------4   调用Observable 的操作符
    create(observableOnSubscribe)//io线程发送数据
                .flatMap(function)//io线程接收数据
                .subscribeOn(Schedulers.io())//指定io线程发送数据
                .flatMap(function)//io线程接收数据
                .observeOn(AndroidSchedulers.mainThread())//切换mian线程接收数据
                .flatMap(function)//mian线程接收数据
                .delay(2, TimeUnit.SECONDS)//delay操作符在Computation线程中接受数据
                .subscribe(consumer);//目标Observer 在Computation线程中接受数据 
输出结果
嗖嘎

六.调度器的种类

RxJava中可用的调度器有下面几种:

Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.single() 该调度器的线程池只能同时执行一个线程。
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.trampoline() 当其它排队的任务完成后,在当前线程排队开始执行。
AndroidSchedulers.mainThread( ) 主线程,UI线程,可以用于更新界面
 //---------------------------5  对比  切换 Observable trampoline调度器---------------
        printThread("    切换 Observable io调度器");
        Observable.just("1","2","3","4","5")
                .subscribeOn(Schedulers.newThread())
                .flatMap(function)
                .observeOn(Schedulers.io())
                .flatMap(function)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer);

输出结果:

输出结果
  Observable.timer(3,TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        printThread("    切换 Observable trampoline调度器");
                        Observable.just("1","2","3","4","5")
                                .subscribeOn(Schedulers.newThread())
                                .flatMap(function)
                                .observeOn(Schedulers.trampoline())
                                .flatMap(function)
                                .observeOn(AndroidSchedulers.mainThread())
                                .subscribe(consumer);
                    }
                });

输出结果:

输出结果

六.各种操作符的默认调度器

总结了一些操作符默认的调度器:

buffer(timespan) computation
buffer(timespan, count) computation
buffer(timespan,timeshift) computation
debounce(timeout, unit) computation
delay(delay, unit) computation
delaySubscription(delay, unit) computation
interval computation
replay(time, unit) computation
replay(buffersize, time, unit) computation
replay(selector, time, unit) computation
replay(selector, buffersize, time, unit) computation
retrytrampolinesample(period, unit) computation
skip(time, unit) computation
skipLast(time, unit) trampoline
skipLast(long time, TimeUnit unit, boolean delayError) trampoline
sample(long period, TimeUnit unit) computation
sample(long period, TimeUnit unit, boolean emitLast) computation
take(time, unit) computation
takeLast(time, unit) computation
takeLast(count, time, unit) trampoline
throttleFirst computation
throttleLast computation
throttleWithTimeout computation
timeInterval computation
timeout(timeoutSelector) computation
timeout(firstTimeoutSelector, timeoutSelector) computation
timeout(timeoutSelector, other) computation
timeout(timeout, timeUnit) computation
timeout(firstTimeoutSelector, timeoutSelector, other) computation
timeout(timeout, timeUnit, other) computation
timer computation
timestamp computation
window(timespan) computation
window(timespan, count) computation
window(timespan, timeshift) computation

最后,小伙伴们,有么有觉得是干货,是的话,就为我点个赞吧!

上一篇下一篇

猜你喜欢

热点阅读