RxJava系列专题(Android方向)Android技术知识程序员

RxJava2.X源码解析(四)

2017-07-14  本文已影响260人  Angels_安杰

更多分享:http://www.cherylgood.cn

一、前言

二、从Demo到原理

 @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable<T> observeOn(Scheduler scheduler) {
      //false为默认无延迟发送错误,bufferSize为缓冲区大小
      return observeOn(scheduler, false, bufferSize());
  }
 @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
      ObjectHelper.requireNonNull(scheduler, "scheduler is null");
      ObjectHelper.verifyPositive(bufferSize, "bufferSize");
      return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
  }

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        //下游的Observer
        final Observersuper T> actual;
        //调度工作者
        final Scheduler.Worker worker;
        //是否延迟错误,默认false
        final boolean delayError;
        //队列大小
        final int bufferSize;
        //存储上游Observable下发的数据队列
        SimpleQueue<T> queue;
        //存储下游Observer的Disposable
        Disposable s;
        //存储错误信息
        Throwable error;
        //校验是否完毕
        volatile boolean done;
        //是否被取消
        volatile boolean cancelled;
        //存储执行模式,同步或者异步 同步
        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observersuper T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
      public void onSubscribe(Disposable s) {

            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                  //1、判断执行模式并调用onSubscribe传递给下游Observer
                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        //true 后面的onXX方法都不会被调用
                        done = true;
                        actual.onSubscribe(this);
                        //2、同步模式下,直接调用schedule
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        //2、异步模式下,等待schedule
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //判断执行模式并调用onSubscribe传递给下游Observer
                actual.onSubscribe(this);
            }
        }
 @Override
  public void onNext(T t) {
       //3、数据源是同步模式或者执行过error / complete 会是true
      if (done) {
          return;
      }
      //如果数据源不是异步类型,
      if (sourceMode != QueueDisposable.ASYNC) {
          //4、上游Observable下发的数据压入queue
          queue.offer(t);
      }
      //5、开始调度
      schedule();
  }
 @Override
    public void onError(Throwable t) {
        if (done) {
            //6、已完成再执行会抛一场
            RxJavaPlugins.onError(t);
            return;
        }
        //7、记录错误信息
        error = t;
        //8、标识已完成
        done = true;
        //9、开始调度
        schedule();
    }
 @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        schedule();
    }
 //关键点就是直接、简单、里面线程调度工作者调用schedule(this),传入了this
    void schedule() {
           //getAndIncrement很关键,他原子性的保证了worker.schedule(this);在调度完之前不会被再次调度
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
    @Override
    public void run() {
          //outputFused一般是false
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
 //从名字看是检测是否已终止
    boolean checkTerminated(boolean d, boolean empty, Observersuper T> a) {
        //1、订阅已取消
        if (cancelled) {
            //清空队列
            queue.clear();
            return true;
        }
        //2、d其实是done,
        if (d) {
            //done==ture可能的情况onNext刚被调度完,onError或者onCompele被调用,
            Throwable e = error;
            if (delayError) {
                //delayError==true时等到队列为空才调用
                if (empty) {
                    if (e != null) {
                        a.onError(e);
                    } else {
                        a.onComplete();
                    }
                    worker.dispose();
                    return true;
                }
            } else {
                //否则直接调用
                if (e != null) {
                    queue.clear();
                    a.onError(e);
                    worker.dispose();
                    return true;
                } else
     if (empty) {
                    a.onComplete();
                    worker.dispose();
                    return true;
                }
            }
        }
        //否则未终结
        return false;
    }
void drainNormal() {
      int missed = 1;
      final SimpleQueue<T> q = queue;
      final Observersuper T> a = actual;
      //Ok,死循环,我们来看下有哪些出口
      for (;;) {
      //Ok,出口,该方法前面分析的
      if (checkTerminated(done, q.isEmpty(), a)) {
              return;
          }

          //在此死循环
          for (;;) {
              boolean d = done;
              T v;
              try {
                  //分发数据出队列
                  v = q.poll();
              } catch (Throwable ex) {
                  //有异常时终止退出
                  Exceptions.throwIfFatal(ex);
                  s.dispose();
                  q.clear();
                  a.onError(ex);
                  //停止worker(线程)
                  worker.dispose();
                  return;
              }
              boolean empty = v == null;
              //判断队列是否为空
              if (checkTerminated(d, empty, a)) {
                  return;
              }
               //没数据退出
              if (empty) {
                  break;
              }
              //数据下发给下游Obsever,这里支付者onNext,onComplete和onError主要放在了checkTerminated里面回调
              a.onNext(v);
          }
       //保证此时确实有一个 worker.schedule(this);正在被执行,
          missed = addAndGet(-missed);
       //为何要这样做呢?我的理解是保证drainNormal方法被原子性调用,如果执行了addAndGet之后getAndIncrement() == 0就成立了,此时又一个worker.schedule(this);被调用了,那么就不能执行break了
          if (missed == 0) {
              break;
          }
      }
  }

总结

相关文章

上一篇下一篇

猜你喜欢

热点阅读