RxJava 中的设计模式(一)观察者模式
2021-06-26 本文已影响0人
蓝笔头
介绍
观察者模式(Observer Pattern)定义了对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都将得到通知,并自动更新。
观察者模式属于行为型模式,行为型模式关注的是对象之间的通讯,观察者模式就是观察者和被观察者之间的通讯。
观察者模式有一个别名叫 发布-订阅模式(Publish Subscribe Pattern),订阅者和订阅目标是联系在一起的,当订阅目标发生改变时,逐个通知订阅者。
实现
观察者模式代码:
@FunctionalInterface
public interface Observer<T> {
void handle(T t);
}
public class Observable<T> {
private List<Observer> observers = new ArrayList<>();
public void subscribe(Observer<T> observer) {
observers.add(observer);
}
public void notifyAll(T data) {
observers.forEach(observer -> observer.handle(data));
}
}
@Slf4j
public class Main {
public static void main(String[] args) {
Observable<String> observable = new Observable<>();
observable.subscribe(data -> log.info("我是第一个观察者 {}", data));
observable.subscribe(data -> log.info("I am the second observer {}", data));
observable.notifyAll("通知");
}
}
输出:
21:26:58.969 [main] INFO org.company.rxjava.pattern.Main - 我是第一个观察者 通知
21:26:58.979 [main] INFO org.company.rxjava.pattern.Main - I am the second observer 通知
可观察对象(Observable)内部维护了观察者(Observer)的引用集合,在有事件发生时,通过 notifyAll() 方法通知所有的观察者(Observer)。
RxJava 中的观察者模式
代码:
public interface Observer<T> {
void onNext(T t);
}
public class Emitter<T> {
private Observer<T> observer;
public Emitter(Observer<T> observer) {
this.observer = observer;
}
public void onNext(T t) {
this.observer.onNext(t);
}
}
public interface ObservableOnSubscribe<T> {
void subscribe(Emitter<T> emitter);
}
public class Observable<T> {
private ObservableOnSubscribe<T> source;
public Observable(ObservableOnSubscribe<T> source) {
this.source = source;
}
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return new Observable<>(source);
}
public void subscribe(Observer<T> observer) {
Emitter<T> emitter = new Emitter<>(observer);
this.source.subscribe(emitter);
}
}
@Slf4j
public class Main {
public static void main(String[] args) {
// 1. 创建一个 Observable 对象
// 2. ObservableOnSubscribe 的 subscribe(Emitter emitter) 方法表示在 Observer 订阅时需要执行的操作
// 3. 通过实现 ObservableOnSubscribe 接口,延迟 subscribe(Emitter emitter) 中 emitter 的逻辑处理
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(Emitter emitter) {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
});
// 4. Observer 订阅到 Observable 上
// 5. subscribe() 会创建一个 Emitter 并把 observer 关联到 Emitter 上。
// 6. 调用 ObservableOnSubscribe 的 subscribe() 方法,开始事件流处理
observable.subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer o) {
log.info("onNext {}", o);
}
});
}
}
输出:
22:06:38.601 [main] INFO org.company.rxjava.pattern.Main - onNext 1
22:06:38.610 [main] INFO org.company.rxjava.pattern.Main - onNext 2
22:06:38.611 [main] INFO org.company.rxjava.pattern.Main - onNext 3
类结构分析:
-
Observer
:观察者,或者称为事件处理者 -
Emitter
:事件发射器。用来发送一个或多个事件。 -
Observable
:可观察对象。用来连接事件发送者和事件处理者。 -
ObservableOnSubscribe
:让Emitter
在Observer
订阅(调用Observable.subscribe()
方法)到Observable
时,才触发事件发送逻辑。
在 subscribe()
被调用时,才开始整个事件流处理。
在此之前只是设置一些事件处理上下文。