api 架构

RxJava2框架源码分析二(Create篇)

2019-12-03  本文已影响0人  yqianqiao

1.回顾

上篇已经介绍了RxJava的基本概念以及用法 RxJava2基本框架分析一(基础篇)

2.实例讲解

       // RxJava的链式操作
        // 1. 创建被观察者(Observable) & 定义需发送的事件
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });
        // 2. 创建观察者(Observer) & 定义响应事件的行为
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                System.out.println("对Next事件" + value + "作出响应");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                System.out.println("对Complete事件作出响应");
            }

        };
        // 3. 通过订阅(subscribe)连接观察者和被观察者
        observable.subscribe(observer);

3. 源码分析

下面,我讲根据 使用步骤 进行RxJava2的源码进行分析
步骤1:创建被观察者(Observable)&定义需发送的事件
步骤2:创建观察者(Observer)&定义响应事件的行为
步骤3:通过订阅(subscribe)连接观察者和被观察者

步骤一:创建被观察者(Observable)

// 1. 创建被观察者(Observable) & 定义需发送的事件
 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });
 /**
  * 源码分析 Observable.create(object : ObservableOnSubscribe<Int>{...])
  *  create 操作主要是创建了 ObservableCreate 对象并且返回出去
 */
    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //判断source是否为空  
        ObjectHelper.requireNonNull(source, "source is null");
        //hook函数:判断是否需要再原对象加上一些代码操作(暂时可以当做返回对象本身)
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
  
  /**
   * 下面我们来看看 ObservableCreate 对象里面做了什么操作
   */
    public final class ObservableCreate<T> extends Observable<T> {
    // ObservableCreate 是Observable的子类
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //构造函数
        //传入source对象,并且赋值全局 = 手动创建的ObservableOnSubscribe匿名内部类对象(Observable.create(new ObservableOnSubscribe<Integer>())
        this.source = source;
    }
  //这里需要留心关注subscribeActual方法后面会讲到

步骤二创建观察者(Observer)

/** 
  * 使用步骤2:创建观察者 & 定义响应事件的行为(方法内的创建对象代码)
  **/
 // 2. 创建观察者(Observer) & 定义响应事件的行为
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                System.out.println("对Next事件" + value + "作出响应");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                System.out.println("对Complete事件作出响应");
            }

        };
/** 
  * 源码分析Observer类
  **/
     public interface Observer<T> {
        // 注:Observer本质 = 1个接口
        // 接口内含4个方法,分别用于 响应 对应于被观察者发送的不同事件
        void onSubscribe(@NonNull Disposable d); // 内部参数:Disposable 对象,可结束事件
        void onNext(@NonNull T t);
        void onError(@NonNull Throwable e);
        void onComplete();
    }

步骤三:通过订阅(subscribe)连接观察者和被观察者

 // 3. 通过订阅(subscribe)连接观察者和被观察者
        observable.subscribe(observer);

/** 
  * 源码分析:Observable.subscribe(observer)
  * 说明:该方法属于 Observable 类的方法(注:传入1个 Observer 对象)
  **/  
public abstract class Observable<T> implements ObservableSource<T> {
     ...
    // 仅贴出关键源码
  @Override
  public final void subscribe(Observer<? super T> observer) {
         ...
         // 仅贴出关键源码
        //可以看到调用的是本类的下面抽象方法
         subscribeActual(observer); 
   }
    //定义了一个抽象方法当调用subscribe时会跟这个调用Observable子类的实现方法(就是调用者)
   protected abstract void subscribeActual(Observer<? super T> observer);
}

/**
*  现在我们回到先前创建的被观察者中 ObservableCreate类 
**/
public final class ObservableCreate<T> extends Observable<T> {
 // ObservableCreate 是Observable的子类
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //构造函数
        //传入source对象,并且赋值全局 = 手动创建的ObservableOnSubscribe匿名内部类对象(Observable.create(new ObservableOnSubscribe<Integer>())
        this.source = source;
    }

   /** 
      * 重点关注:复写了subscribeActual()
      * 作用:订阅时,通过接口回调 调用被观察者(Observerable) 与 观察者(Observer)的方法
      **/
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
      //1. 创建1个CreateEmitter对象(封装成一个Disposable对象)
      //作用:发射事件
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
      //2. 调用观察者(Observer)的onSubscribe()
     // onSubscribe()的实现 = 使用步骤2(创建观察者(Observer))时复写的onSubscribe()
       //将Disposable(CreateEmitter) 传到观察者onSubscribe(Disposable d) 参数中,使之可以解除订阅
        observer.onSubscribe(parent);

        try {
            //3.调用source对象的subscribe()方法
            // source对象 = 使用步骤1(创建被观察者(Observable))中创建的ObservableOnSubscribe对象
            //subscribe()的实现 = 使用步骤1(创建被观察者(Observable))中复写的subscribe()
            //将CreateEmitter对象传递给被观察者进行对象方法的调用(onNext(),onComplete()...)
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

  /** 
    * 分析2:emitter.onNext("1");
    * 此处仅讲解subscribe()实现中的onNext()
    * onError()、onComplete()类似,此处不作过多描述
    **/
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            //初始化讲观察者赋值到全局变量observer
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
          //当被观察者调用onNext()方法时,回调此方法(步骤一中创建Observable.create()匿名内部类中的onNext())
            //发送的事件不能为null
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
          //判断是否断开连接(调用Disposable.dispose())
          //没有断开的话,则调用观察者中的onNext()方法
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
}

步骤3总结:当被观察者订阅观察者的时候,会调用被观察者ObservablesubscribeActual()抽象方法,回调其子类重新的subscribeActual()方法。这方法里面有三个步骤:

4. 源码总结

上一篇 下一篇

猜你喜欢

热点阅读