观察者设计模式
前言
观察者(Observer)模式的定义:指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。这种模式有时又称作发布-订阅模式、模型-视图模式,它是对象行为型模式。
观察者模式是一种对象行为型模式,其主要优点如下: - 降低了目标与观察者之间的耦合关系,两者之间是抽象耦合关系。符合依赖倒置原则。 - 目标与观察者之间建立了一套触发机制。
它的主要缺点如下: - 目标与观察者之间的依赖关系并没有完全解除,而且有可能出现循环引用。 - 当观察者对象很多时,通知的发布会花费很多时间,影响程序的效率。
生活小案列
举例说明:领导创建了一个名为摸鱼先锋队的群,用于团建事项的通知,领导当晚发布了一条消息--这次团建新入职的员工需要准备表演节目。这个时候群里所有人都能看到这条消息,只有新员工收到这个通知后会有一系列表演的准备工作要做,然而这条消息对老员工没有任何的影响。
这就是一个观察者模式的生活案例。当领导有事的时候发布通知到群里,群里的所有人收到通知后做相应的事情。 以上案例中可以分为下面几个角色: - 监听者(也可以说是观察者):群里面每一个人都是一个监听者。 - 管理者(对应其他教程中的主题-subject):也就是群,主要有添加(群成员)监听者,移除(群成员)监听者,还有通知所有监听者的功能。 - 事件(或者说通知):也就是领导发到群里面的消息是一个事件(或者说通知)。
通过上面的例子来整理一下实现一个观察者模式的思路。 看看一个流程:领导创建群将相关人员添加到群,然后向群里面发布一个通知。群里面每个人看到这条消息后,做相应的事情,当这个消息与自己无关时,啥也不做。 领导在这里面可以看做是应用程序的一个线程,只是程序执行的一个单元而已。 接下来就是一般设计模式都有的套路,为了程序的扩展性。上面的几个角色都需要定义成抽象的概念,那么在Java里面定义抽象有两种一个是接口一个是抽象类。具体定义成接口还是抽象类根据实际情况自行选择。
抽象概念
为什么要定义成抽象的呢?我们先了解一下抽象的概念,我理解抽象就是对一类事物公共部分的定义。比如说水果,就是对一类事物的抽象定义,说到水果,大家肯定能联想到,多汁且主要味觉为甜味和酸味,可食用的植物果实,有丰富的营养成分。这个就是水果的公共成分,但是水果又分为多种,火龙果,百香果···。 抽象的好处:比如今天你家里只有一种水果-火龙果。你爹叫你拿一点水果来吃,那你肯定就能直接把家里唯一的水果火龙果拿过来孝敬你老爹。在这个过程中你爹说的水果而不是火龙果,能够少说一个字从而节约能量多活一纳秒。那么我们可以得出一个结论-使用抽象概念可以延年益寿→_→。 开个玩笑,下面言归正传,我说一下我认为抽象的好处: - 当接口只定义一个实现类时,方便功能的替换(换一个实现类,在新实现类新增功能。从而避免了对调用方和原实现类原代码的改动)。 - 方法形参定义为抽象,这时就能实现传入不同的实现类该方法可以实现不同的功能。 - 统一管理,让程序更规范化,当抽象中定义新的非抽象方法,子类可以直接继承使用。
有了上面的铺垫,很容易理解下面的代码示例。
观察者模式代码示例
观察者模式其实也是发布订阅模式。 针对不同的观察者需要有不同的实现方式,所以先创建一个管理者的接口,将其定义为一个抽象概念,方便后续扩展。 这个接口相当于-群(管理者)
/**
* 观察者的顶层接口
* @param <T>
*/
public interface ObserverInterface<T> {
//注册监听者
public void registerListener(T t);
//移除监听者
public void removeListener(T t);
//通知监听者
public void notifyListener(DataEvent t);
}
定义抽象的监听者接口 这个接口相当于-群成员(监听者)
/**
* Listener的顶级接口,为了抽象Listener而存在
*/
public interface MyListener {
void onEvent(DataEvent event);
}
定义抽象的事件接口 这个接口相当于群里面发布的通知
@Data
public abstract class DataEvent {
private String msg;
}
创建管理者的实现类,相当于具体的群(如微信群,钉钉群)
/**
* 循环调用方式的观察者(同步)
*/
@Component
public class LoopObserverImpl implements ObserverInterface<MyListener> {
//监听者的注册列表
private List<MyListener> listenerList = new ArrayList<>();
@Override
public void registerListener(MyListener listener) {
listenerList.add(listener);
}
@Override
public void removeListener(MyListener listener) {
listenerList.remove(listener);
}
@Override
public void notifyListener(DataEvent event) {
for (MyListener myListener : listenerList) {
myListener.onEvent(event);
}
}
}
创建两个event的实现类,一个是积分事件,一个是短信事件
/**
* 积分事件类
*/
public class ScoreDataEvent extends DataEvent {
private Integer score;
}
/**
* 短信事件类
*/
public class SmsDataEvent extends DataEvent {
private String phoneNum;
}
创建两个listener的实现类,一个是处理积分的,一个是处理短信的
/**
* MyListener的实现类,分数监听者
*/
@Component
public class MyScoreListener implements MyListener {
@Override
public void onEvent(DataEvent dataEvent) {
if (dataEvent instanceof ScoreDataEvent) {
//...省略业务逻辑
System.out.println("积分处理:" + dataEvent.getMsg());
}
}
}
/**
* MyListener的实现类,短信监听者
*/
@Component
public class MySmsListener implements MyListener {
@Override
public void onEvent(DataEvent dataEvent) {
if (dataEvent instanceof SmsDataEvent) {
//...省略短信处理逻辑
System.out.println("短信处理");
}
}
}
观察者模式的要素就到齐了,我们在main方法里面跑一下
public class Operator {
public static void main(String[] args) {
//通过spring的AnnotationConfigApplicationContext将com.example.demo.user.admin.design路径下的所有加了spring注解的类都扫描放入spring容器
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext("com.example.demo.user.admin.design");
//从spring容器中获取对应bean的实例
LoopObserverImpl loopObserver = context.getBean(LoopObserverImpl.class);
MyScoreListener scoreL = context.getBean(MyScoreListener.class);
MySmsListener smsL = context.getBean(MySmsListener.class);
//向观察者中注册listener
loopObserver.registerListener(scoreL);
loopObserver.registerListener(smsL);
ScoreDataEvent scoreData = new ScoreDataEvent();
scoreData.setMsg("循环同步观察者");
//发布积分事件,通知监听者
loopObserver.notifyListener(scoreData);
}
}
异步方式
不同之处就是引入了阻塞队列,让通知这个操作变成异步操作,既只需要将event时间放入阻塞队列之后就可以直接返回了。不用像LoopObserverImpl要等到listener注册表循环完毕才能返回。这样就实现了通知操作和循环listener注册表的解耦和异步。
举例说明异步实现和同步实现的区别: 同步:还是团建群的例子,假如领导是保姆型领导,通知下来任务之后可能不太放心,要挨个问,小张你准备什么表演阿,大概多久能准备好鸭。小红你呢→_→。。。 异步:假如是甩手掌柜型领导,发布完消息之后他就不管了。 上面就是同步和异步的区别,同步就是领导是个保姆,挨个问挨个了解情况之后这个事情才算完。异步就是领导发布完消息就完事儿。
/**
* 启动一个线程循环阻塞队列的观察者,可以实现解耦异步。
*/
@Component
public class QueueObserverImpl implements ObserverInterface<MyListener> {
//监听者的注册列表
private List<MyListener> listenerList = new ArrayList<>();
//创建一个大小为10的阻塞队列
private BlockingQueue<DataEvent> queue = new LinkedBlockingQueue<>(10);
//创建一个线程池
private ExecutorService executorService = new ScheduledThreadPoolExecutor(1, r -> {
Thread t = new Thread(r);
t.setName("com.kangarooking.observer.worker");
t.setDaemon(false);
return t;
});
// private ExecutorService executorService = Executors.newFixedThreadPool(1);
@Override
public void registerListener(MyListener listener) {
listenerList.add(listener);
}
@Override
public void removeListener(MyListener listener) {
listenerList.remove(listener);
}
@Override
public void notifyListener(DataEvent event) {
System.out.println("向队列放入DataMsg:" + event.getMsg());
queue.offer(event);
}
@PostConstruct
public void initObserver() {
System.out.println("初始化时启动一个线程");
executorService.submit(() -> {
while (true) {
try {
System.out.println("循环从阻塞队列里面获取数据,take是阻塞队列没有数据就会阻塞住");
DataEvent dataMsg = queue.take();
System.out.println("从阻塞队列获取到数据:" + dataMsg.getMsg());
eventNotify(dataMsg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
private void eventNotify(DataEvent event) {
System.out.println("循环所有的监听者");
for (MyListener myListener : listenerList) {
myListener.onEvent(event);
}
}
}
调用一下
public class Operator {
public static void main(String[] args) {
//通过spring的AnnotationConfigApplicationContext将com.example.demo.user.admin.design路径下的所有加了spring注解的类都扫描放入spring容器
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext("com.example.demo.user.admin.design");
/*******************************************/
//从spring容器获取QueueObserverImpl观察者
QueueObserverImpl queueObserver = context.getBean(QueueObserverImpl.class);
//向观察者中注册listener
queueObserver.registerListener(scoreL);
queueObserver.registerListener(smsL);
ScoreDataEvent scoreData1 = new ScoreDataEvent();
scoreData1.setMsg("队列异步观察者");
//发布积分事件,通知监听者
queueObserver.notifyListener(scoreData1);
}
}
开源框架的实现
同步方式
spring的发布订阅就是基于同步的观察者模式: 简单来说就是将所有的监听者注册到一个列表里面,然后当发布事件时,通过循环监听者列表,在循环里面调用每个监听者的onEvent方法,每个监听者实现的在onEvent方法里面判断传入的event是否属于当前需要的event,属于就处理该事件,反之不处理。
spring的ApplicationEventMulticaster就是示例讲的观察者顶层接口
image.png
ApplicationListener就是示例代码的监听者顶层接口
image.png
在refresh方法里面调用的registerListeners();方法就是将所有的监听者实现类注册到观察者的注册表中
image.png
ApplicationEventMulticaster的multicastEvent方法就是上面讲的通知方法,这里就是循环监听者注册表,调用每个监听者的onApplicationEvent方法(这里的invokeListener方法里面最终会调用到listener.onApplicationEvent(event);)
image.png
随便看一个onApplicationEvent方法的实现,跟上面的例子是不是很相似
image.png
异步方式
nacos中有很多地方都使用到了观察者模式,如client端和server端建立连接,发布连接事件,相关监听者做相应的处理,断开连接也是一样。
在server端接收到client端的注册请求后,会发布一个注册事件的通知
image.png
在nacos-server启动的时候也是会开启一个线程做死循环,循环的去queue里面take数据,如果没有的话就会阻塞。所以死循环只有在queue里面一直有数据的时候才会一直循环,当queue里面没有数据的时候就会阻塞在queue.take();方法处。
image.png
我们看看receiveEvent(event);方法里面做了什么,这里就体现了框架里面设计的精妙:在上面我们自己的设计中,这里应该是需要循环调用所有的listener的onApplicationEvent方法,但是当注册表中listener太多的时候就会出现(有些event可能会有多个listener需要处理)循环调用太慢的问题,这里使用多线程的处理方式,让这些调用并行处理,大大的提高了框架的事件处理效率。
image.png
关于业务使用场景
可以说观察者模式能解决的,消息队列也可以解决,并且可以做的更好。主要根据实际情况取舍。
考虑使用消息队列的情况
当公司的服务器资源充足,并且用户量大,相关业务逻辑调用频繁,消息要求高可靠性,以及消息要求发布订阅更灵活,就可以考虑使用消息队列。
考虑使用观察者模式的情况
当服务器资源不充足,或者调用比较少,或者希望使用轻量的通知机制,对于消息可靠性要求不高,可以考虑在项目代码里面使用观察者模式。 当然使用观察者模式比较麻烦的一点就是要自己写一定量的代码,而且功能还不如消息队列的强大,并且不能保证消息的可靠性,当观察者获取消息在自己的处理逻辑里面产生异常时,可能还需要自己先写好发生异常后的降级代码(当然如果对可靠性要求不高的业务场景就不需要)。
为什么框架使用观察者模式而不使用消息队列(个人理解):
-
消息队列太重;
-
本身就是开源框架(本身代表原创),不适合再引入另一个很重的消息队列。如果引入会增加用户的使用和部署成本以及难度,对于自身的推广也不利。