观察者模式在RxJava中的运用(一)RxJava整体框架分析
观察者模式在RxJava中的运用(一)RxJava整体框架分析
1、传统观察者模式的定义
-
抽象被观察者角色
Observable接口:声明add、remvoe、notifyObservers方法 -
抽象观察者角色
Observer接口:声明update方法,当被观察者调用notifyObservers方法时,观察者的update方法就会被调用到。 -
具体观察者角色
实现Observer接口 -
具体被观察者角色(也叫具体主题)
实现Observable接口,定义一个集合存储所有的观察者
通过add方法向集合里添加观察者、通过remvoe方法移除集合里的观察者、通过notifyObservers方法通知所有的
观察者(也就是list里的观察者),然后每个观察者通过update方法更新接收到的消息。
2、观察者模式在java(jdk)中的体现
-
抽象被观察者角色
Observable类,定义了存储观察者的集合(Vector<Observer> obs)和add、remvoe、notifyObservers方法,
用Vector不用list的原因是考虑到线程安全的问题, -
抽象观察者角色
Observer接口:声明update方法,当被观察者调用notifyObservers方法时,观察者的update方法就会被调用到。 -
具体观察者角色
实现Observer接口 -
具体被观察者角色
实现Observable接口,通过add方法向集合里添加观察者、通过remvoe方法移除集合里的观察者、通过notifyObservers方法通知所有的
观察者(也就是list里的观察者),然后每个观察者通过update方法更新接收到的消息。
3、观察者模式和发布订阅模式区别和联系
观察者模式和发布订阅模式其实它们的主要思想是一样的,但是在观察者模式中,被观察者里保存了所有的观察者(集合),
而在发布订阅模式中,被观察者里是不保存观察者集合的。发布订阅模式比起观察者模式,耦合度更低而已
4、观察者模式在RxJava中的体现
//1. 创建一个Observable 可被观察的
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
if(!emitter.isDisposed()){
emitter.onNext("hello rxjava");
emitter.onNext("1234");
}
emitter.onComplete();
}
});
//2. 创建一个Observer 观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG,"onSubscribe: " + d);
}
@Override
public void onNext(String s) {
Log.i(TAG,"onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG,"onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.i(TAG,"onComplete ");
}
};
//3 观察者通过订阅(subscribe)被观察者 把它们连接到一起
//observer(观察者) 订阅 observable(被观察者)
observable.subscribe(observer);
-
1、在jdk中被观察者是通过add往集合里添加观察者的,RxJava中是通过observable.subscribe(observer)的方式。
-
2、那么在RxJava中前面说的四个重要的角色是怎么定义的呢?它们在RxJava中是怎么实例化的?是怎么订阅和消息传递的呢?
5、RxJava源码分析
我们先看MainActivity类里的Observable.create方法
MainActivity#Observable.create
Observable#public static <T> Observable<T> create
ObservableCreate# ObservableCreate<T> extends Observable<T>
看到ObservableCreate<T> extends Observable<T>其实我们能联想到在jdk中的观察者模式,
-
具体被观察者角色
final class ObservableCreate<T> -
抽象被观察者角色
abstract class Observable<T>
我们再看到MainActivity类里的new Observer<String>()方法
MainActivity#new Observer<String>()
interface Observer<T>
-
抽象观察者角色
interface Observer<T>
onNext方法就相当于我们前面提到的update方法 -
具体的观察者角色
匿名类Observer<String> observer就是我们在MainActivity中通过new一个Observer接口的方式产生的匿名类observer。这个匿名类就是
具体的观察者,最后通过observable.subscribe(observer);把观察者和被观察者绑定,产生订阅-被订阅关系。我们知道,在java中抽象被观察者里是有一个集合用来存放了观察者的,所以我们进到抽象类Observable的subscribe方法看看
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
前面是一些判空操作,进入subscribeActual,我们发现subscribeActual是一个抽象方法,那么在抽象被观察者角色Observable里的具体
实现肯定是在具体被观察者角色ObservableCreate里的subscribeActual
ObservableCreate#subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
通过new CreateEmitter<T>(observer)创建一个发射器parent,并把观察者者传入到发射器CreateEmitter里
再调用抽象观察者对象Observer的onSubscribe,因为是抽象类的抽象方法,所以实际上就是调用我们在MainActivity创建的匿名类observer里的
onSubscribe方法,这里把发射器传入到onSubscribe里,发射器具体传入到这里面什么作用,我们下面再分析,我们先看下一句
source.subscribe(parent);,这一句代码实际上就是真正产生订阅操作的关键代码,这里的source就是我们上面Observable.create
时候传进来的ObservableOnSubscribe对象。所以这里的onSubscribe方法就是MainActivity里Observable.create的创建ObservableOnSubscribe的时候实现的subscribe并把parent传了进去,parent是CreateEmitter<T>对象,subscribe里面调用的next实际就是调用了
parent的next方法,也就是CreateEmitter类的next方法。我们再看到CreateEmitter#next
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
里面调用了抽象观察者observer.onNext(t);就是调用到了MainActivity里匿名类observer的next方法。在调用onNext之前,判断了isDisposed
的值,这个isDisposed()值其实就是上面分析的observer.onSubscribe(parent);,我们可以在匿名类的onSubscribe方法里去中断事件。
到这里我们RxJava里整个创建观察者和被观察者,及其绑定和收发消息的流程就都通了。
其实我们把RxJava里实现的观察者模式看成发布订阅模式更为好理解一些,因为它在实现的时候,在被观察者中也是没有存储观察者对象,
是把观察者传到了CreateEmitter发射器里这一点和发布订阅模式是类似的。
最后我们总结下:
在RxJava中四个重要的角色
-
具体被观察者角色
final class ObservableCreate<T> -
抽象被观察者角色
abstract class Observable<T> -
抽象观察者角色
interface Observer<T> -
具体的观察者角色
匿名类Observer<String> observer
一开始,我们会创建一个observable对象,然后调用subscribe里的发射器的onNext发送消息;第二步:创建一个Observer匿名类观察者,在onNext里接收消息,这里可以在onSubscribe里通过Disposable
中断这个事件。