Android开发Android开发经验谈Android技术知识

观察者模式在RxJava中的运用(一)RxJava整体框架分析

2020-04-18  本文已影响0人  程序员三千_

观察者模式在RxJava中的运用(一)RxJava整体框架分析

1、传统观察者模式的定义

2、观察者模式在java(jdk)中的体现

3、观察者模式和发布订阅模式区别和联系

观察者模式和发布订阅模式其实它们的主要思想是一样的,但是在观察者模式中,被观察者里保存了所有的观察者(集合),
而在发布订阅模式中,被观察者里是不保存观察者集合的。发布订阅模式比起观察者模式,耦合度更低而已

4、观察者模式在RxJava中的体现

 //1. 创建一个Observable  可被观察的
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                if(!emitter.isDisposed()){
                    emitter.onNext("hello rxjava");
                    emitter.onNext("1234");
                }
                emitter.onComplete();
            }
        });
        //2. 创建一个Observer 观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG,"onSubscribe: " + d);
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG,"onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG,"onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i(TAG,"onComplete ");
            }
        };
        //3 观察者通过订阅(subscribe)被观察者 把它们连接到一起
        //observer(观察者) 订阅 observable(被观察者)
        observable.subscribe(observer);


5、RxJava源码分析

我们先看MainActivity类里的Observable.create方法
MainActivity#Observable.create
Observable#public static <T> Observable<T> create
ObservableCreate# ObservableCreate<T> extends Observable<T>

看到ObservableCreate<T> extends Observable<T>其实我们能联想到在jdk中的观察者模式,

我们再看到MainActivity类里的new Observer<String>()方法
MainActivity#new Observer<String>()
interface Observer<T>

 public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

前面是一些判空操作,进入subscribeActual,我们发现subscribeActual是一个抽象方法,那么在抽象被观察者角色Observable里的具体
实现肯定是在具体被观察者角色ObservableCreate里的subscribeActual

ObservableCreate#subscribeActual

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

通过new CreateEmitter<T>(observer)创建一个发射器parent,并把观察者者传入到发射器CreateEmitter里
再调用抽象观察者对象Observer的onSubscribe,因为是抽象类的抽象方法,所以实际上就是调用我们在MainActivity创建的匿名类observer里的
onSubscribe方法,这里把发射器传入到onSubscribe里,发射器具体传入到这里面什么作用,我们下面再分析,我们先看下一句
source.subscribe(parent);,这一句代码实际上就是真正产生订阅操作的关键代码,这里的source就是我们上面Observable.create
时候传进来的ObservableOnSubscribe对象。所以这里的onSubscribe方法就是MainActivity里Observable.create的创建ObservableOnSubscribe的时候实现的subscribe并把parent传了进去,parent是CreateEmitter<T>对象,subscribe里面调用的next实际就是调用了
parent的next方法,也就是CreateEmitter类的next方法。我们再看到CreateEmitter#next

 @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

里面调用了抽象观察者observer.onNext(t);就是调用到了MainActivity里匿名类observer的next方法。在调用onNext之前,判断了isDisposed
的值,这个isDisposed()值其实就是上面分析的observer.onSubscribe(parent);,我们可以在匿名类的onSubscribe方法里去中断事件。
到这里我们RxJava里整个创建观察者和被观察者,及其绑定和收发消息的流程就都通了。
其实我们把RxJava里实现的观察者模式看成发布订阅模式更为好理解一些,因为它在实现的时候,在被观察者中也是没有存储观察者对象,
是把观察者传到了CreateEmitter发射器里这一点和发布订阅模式是类似的。

最后我们总结下:

在RxJava中四个重要的角色

一开始,我们会创建一个observable对象,然后调用subscribe里的发射器的onNext发送消息;第二步:创建一个Observer匿名类观察者,在onNext里接收消息,这里可以在onSubscribe里通过Disposable
中断这个事件。

上一篇下一篇

猜你喜欢

热点阅读