RxJava 中的设计模式(一)观察者模式

2021-06-26  本文已影响0人  蓝笔头

介绍

观察者模式(Observer Pattern)定义了对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都将得到通知,并自动更新。

观察者模式属于行为型模式,行为型模式关注的是对象之间的通讯,观察者模式就是观察者和被观察者之间的通讯。

观察者模式有一个别名叫 发布-订阅模式(Publish Subscribe Pattern),订阅者和订阅目标是联系在一起的,当订阅目标发生改变时,逐个通知订阅者。

实现

观察者模式

代码:

@FunctionalInterface
public interface Observer<T> {
    void handle(T t);
}

public class Observable<T> {
    private List<Observer> observers = new ArrayList<>();

    public void subscribe(Observer<T> observer) {
        observers.add(observer);
    }

    public void notifyAll(T data) {
        observers.forEach(observer -> observer.handle(data));
    }
}

@Slf4j
public class Main {

    public static void main(String[] args) {
        Observable<String> observable = new Observable<>();
        observable.subscribe(data -> log.info("我是第一个观察者 {}", data));
        observable.subscribe(data -> log.info("I am the second observer {}", data));

        observable.notifyAll("通知");
    }
}

输出:

21:26:58.969 [main] INFO org.company.rxjava.pattern.Main - 我是第一个观察者 通知
21:26:58.979 [main] INFO org.company.rxjava.pattern.Main - I am the second observer 通知

可观察对象(Observable)内部维护了观察者(Observer)的引用集合,在有事件发生时,通过 notifyAll() 方法通知所有的观察者(Observer)。

RxJava 中的观察者模式

代码:

public interface Observer<T> {
    void onNext(T t);
}

public class Emitter<T> {
    private Observer<T> observer;

    public Emitter(Observer<T> observer) {
        this.observer = observer;
    }

    public void onNext(T t) {
        this.observer.onNext(t);
    }
}

public interface ObservableOnSubscribe<T> {
    void subscribe(Emitter<T> emitter);
}

public class Observable<T> {
    private ObservableOnSubscribe<T> source;

    public Observable(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return new Observable<>(source);
    }

    public void subscribe(Observer<T> observer) {
        Emitter<T> emitter = new Emitter<>(observer);
        this.source.subscribe(emitter);
    }
}

@Slf4j
public class Main {

    public static void main(String[] args) {
        // 1. 创建一个 Observable 对象
        // 2. ObservableOnSubscribe 的 subscribe(Emitter emitter) 方法表示在 Observer 订阅时需要执行的操作
        // 3. 通过实现 ObservableOnSubscribe 接口,延迟 subscribe(Emitter emitter) 中 emitter 的逻辑处理
        Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(Emitter emitter) {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        });

        // 4. Observer 订阅到 Observable 上
        // 5. subscribe() 会创建一个 Emitter 并把 observer 关联到 Emitter 上。
        // 6. 调用 ObservableOnSubscribe 的 subscribe() 方法,开始事件流处理
        observable.subscribe(new Observer<Integer>() {
            @Override
            public void onNext(Integer o) {
                log.info("onNext {}", o);
            }
        });
    }
}

输出:

22:06:38.601 [main] INFO org.company.rxjava.pattern.Main - onNext 1
22:06:38.610 [main] INFO org.company.rxjava.pattern.Main - onNext 2
22:06:38.611 [main] INFO org.company.rxjava.pattern.Main - onNext 3

类结构分析:

subscribe() 被调用时,才开始整个事件流处理。
在此之前只是设置一些事件处理上下文。

参考

上一篇下一篇

猜你喜欢

热点阅读