RxJava中实现观察者模式的原理

2018-12-04  本文已影响0人  菇凉别走

前言:

    本文所有代码基于RxJava 2.2.3和RxAndroid 2.1.0,并使用kotlin来介绍RxJava中常用的操作符的用法及效果。

第一步:创建被观察者Observable。
1、 create其实就是Observable里的一个静态方法用于创建一个Observable实例。在下面这段代码中我们主动创建了两个对象实例,一个是由Observable.create方法返回的Observable实例,一个是传入create方法的参数ObservableOnSubscribe匿名对象并覆写了定义事件发送顺序的方法 ObservableOnSubscribe.subscribe。

         //第一步
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                Log.e("Create","ObservableOnSubscribe对象的subscribe方法被调用");
                emitter.onNext("数据1");
                emitter.onNext("数据2");
                emitter.onNext("数据3");
                emitter.onNext("数据4");
                emitter.onError(new Exception());
                emitter.onComplete();
            }
        });

2、Observable的create方法,该方法在这种情况下实际上返回的就是新new的 ObservableCreate对象,并且会将ObservableOnSubscribe对象赋值给其内部的成员变量source

    //Observable的create方法
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    //ObservableCreate类的成员变量及其构造函数 create传入的参数ObservableOnSubscribe对象最终赋值给了source
   final ObservableOnSubscribe<T> source;
   public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
    }
    
   //RxJavaPlugins.onAssembly方法
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {//此时 f == null 所以会返回source即传入的参数不会有其他操作
            return apply(f, source);
        }
        return source;
    }

第一步结论:创建了一个ObservableCreate对象,并将决定订阅时事件调动顺序的ObservableOnSubscribe对象赋值给其成员变量source。

第二步:创建观察者Observer

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("Create","Observer的onSubscribe方法");
            }

            @Override
            public void onNext(String s) {
                Log.e("Create","Observer的onNext方法 收到的参数为:"+s);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("Create","Observer的onError方法");
            }

            @Override
            public void onComplete() {
                Log.e("Create","Observer的onComplete方法");
            }
        };
第二步结论:第二步就是单纯的创建了一个观察者,并覆写如上所示的四个方法。

第三步:让被观察者(Observable)和观察者(Observer)之间产生联系

//观察者订阅被观察者 两者关系发生
 observable.subscribe(observer);

1、订阅之后立即调用被观察者(observable)的subscribe方法。该方法中调用了抽象方法subscribeActual(observer);并将观察者(observer)作为参数传入传入。

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned 
            a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe 
            for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            //核心代码是这句这个方法在Observable类中是个抽象方法第一步创建的ObservableCreate对象,覆写了该方法,该方法的参数为
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

2、当这个创建的Observable对象调用subscribe方法时,subscribe方法会调用ObservableCreate类中覆写的subscrubeActual方法,subscrubeActual的代码如下:

   
    protected void subscribeActual(Observer<? super T> observer) {
        //将订阅的Observer对象进行封装到CreateEmitter中,作为source调用subscribe方法时的参数
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            //source即为调用create操作符时传入的用于定义调用顺序的ObservableOnSubscribe对象,在此处调用了其subscribe方法
           //传入的参数是上面封装了的CreateEmitter对象,所以我们在外面定义的调用顺序就是调用的CreateEmitter的方法如onNext,
          //onError,onComplete等方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

3、再看看CreateEmitter中对onNext,onError,onComplete的封装。三个方法都调用了观察者(observer)的相应方法,但是在调用前都对其进行了处理,判断其是否被取消。若调用了onError或者onComplete中的任意一个方法时,就会被取消,即用dispose()方法,这时 !isDisposed())的值就为false了。所以后续的观察者的其他方法都不可调用了。

        //observer对象即为我们在第二步创建的观察者(observer)
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally
                                                   not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not 
                                                                allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

第三步总结:被观察者(observable)调用subscribe方法,此方法最终将观察者(observer)对象封装成Emitter并作为参数,调用ObservableCreate中source即第一步创建时传入的参数的subscribe方法。即实现了调用第一步时定义的调用顺序。而Emitter的封装使onError及onComplete的调用做了限制,保证了onError和onComplete方法只能调用其中一个。

最后总结:

上述一系列操作,可归结为以下伪代码:

     //规定调用顺序
       ObservableOnSubscribe<String> source = new ObservableOnSubscribe<String>(){
           @Override
           public void subscribe(ObservableEmitter emitter) throws Exception {
               Log.e("Create","ObservableOnSubscribe对象的subscribe方法被调用");
               emitter.onNext("数据1");
               emitter.onNext("数据2");
               emitter.onNext("数据3");
               emitter.onNext("数据4");
               emitter.onError(new Exception());
               emitter.onComplete();
           }
       };
   //创建被观察者
       Observable<String> observable = new ObservableCreate<>(source);
       observable.source = source;
     //以上为第一步
//-------------------------------------------------------------------------
       Observer<String> observer = new Observer<String>();
//创建观察者这是第二步
//---------------------------------------------------------------------------
//调用observable.subscribe(observer)
       Emitter<String> emitter = new EmitterCreate(observer);
       observable.source.subscribe(emitter);
//第三步订阅
//-----------------------------------------------------------------------------

上一篇下一篇

猜你喜欢

热点阅读