Spring的事件驱动模型
1. 简单介绍:
事件Event驱动模型实际也被称之为观察者模式,或者发布/订阅模型。
Spring中,也为我们提供了这样的框架,采用Event/Listener这样的好处自然不用多说,就是解耦,利于扩展,并且利于一对多这种形式,我们下面就来介绍下Spring的Event模式:
2. 核心类:
想要了解Spring的事件模型,需要了解一下几个类:
- ApplicationEvent
事件本身,继承自Java EventObject。 - ApplicationListener
监听者对象,能够选择监听不同的事件Event
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener
- ApplicationEventMulticaster
事件广播,也是Spring事件模型中的核心对象,事件发布-监听依靠的就是ApplicationEventMulticaster,其内部类ListenerRetriever按照不同的ApplicationEvent类型对Listener进行了保存,这也是我们为什么能够针对不同的Event进行过滤从而唤醒Listener的过程。 - ApplicationEventPublisher
事件的发布者,通过ApplicationEventPublisher#publishEvent的方法进行事件的广播。 - ApplicationEventPublisherAware
获取ApplicationEventPublisher的方法。 - AbstractApplicationContext
Spring的事件模型框架的主要逻辑的内容出。AbstractApplicationContext中实现了ApplicationEventPublisher接口,从而实际在Spring中,默认的ApplicationEventPublisher实现就是AbstractApplicationContext。
3. 例子
我们以如下的情况为例:
以问答知乎为例,我们在新增一个问题时,需要异步的去建立索引。而建立索引的过程由于和新增的逻辑没有必然的联系,所以可以通过异步的方式来进行,而这种事件的方式能够很好的进行解耦,或者异步来执行,所以我们可以采用事件驱动来实现。
// QuestionEvent
public class QuestionEvent extends ApplicationEvent {
private String question;
public String getQuestion() {
return question;
}
public void setQuestion(String question) {
this.question = question;
}
public QuestionEvent(String question) {
super(question);
this.question = question;
}
}
// QuestionListener
@Component("questionListener")
public class QuestionListener implements ApplicationListener<QuestionEvent> {
public void onApplicationEvent(QuestionEvent questionEvent) {
System.out.println("index question : " + questionEvent.getQuestion());
}
}
// EventPublisher
@Component(value = "eventPublisher")
public class EventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
public ApplicationEventPublisher getApplicationEventPublisher() {
return applicationEventPublisher;
}
}
// EventTest
public class EventTest extends BaseTestNG {
@Resource
private ApplicationEventPublisher applicationEventPublisher;
@Test
public void testEvent() {
String question = "这是一个问题";
applicationEventPublisher.publishEvent(new QuestionEvent(question));
}
}
输出:
index question : 这是一个问题
4. 源码分析:
在分析源码之前,我们首先要明白Event-Listener模式实际就是观察者模式。如果看过之前写过的观察者的模式的文章,将会对源码的内容较容易明白。
首先是Listener的注册:
我们定义了Listener(实现了ApplicationListener),那又是什么时候被注册到BeanFactory中呢,如果了解Spring的Bean的生命周期的话,会知道BeanPostProcess是在AbstractApplicationContext的refresh时候被实例化并初始化的,这个过程是要由于非lazy-init Bean的实例化过程的。而实际,Listener的注册过程也是处在refresh的过程中的,我们来看AbstractApplicationContext#refresh的代码:
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// Prepare this context for refreshing.
prepareRefresh();
// Tell the subclass to refresh the internal bean factory.
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
// Prepare the bean factory for use in this context.
prepareBeanFactory(beanFactory);
try {
// Allows post-processing of the bean factory in context subclasses.
postProcessBeanFactory(beanFactory);
// Invoke factory processors registered as beans in the context.
invokeBeanFactoryPostProcessors(beanFactory);
// Register bean processors that intercept bean creation.
registerBeanPostProcessors(beanFactory);
// Initialize message source for this context.
initMessageSource();
// 初始化传播器
initApplicationEventMulticaster();
// Initialize other special beans in specific context subclasses.
onRefresh();
// 注册监听器
registerListeners();
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);
// Last step: publish corresponding event.
finishRefresh();
}
catch (BeansException ex) {
if (logger.isWarnEnabled()) {
logger.warn("Exception encountered during context initialization - " +
"cancelling refresh attempt: " + ex);
}
// Destroy already created singletons to avoid dangling resources.
destroyBeans();
// Reset 'active' flag.
cancelRefresh(ex);
// Propagate exception to caller.
throw ex;
}
finally {
// Reset common introspection caches in Spring's core, since we
// might not ever need metadata for singleton beans anymore...
resetCommonCaches();
}
}
}
上面代码中,与Event有关的是两个内容,第一个是initApplicationEventMulticaster(),用于初始化事件传播器,而第二个registerListeners(),便是注册监听器,实际就是扫描实现了ApplicationListener接口的类并注册到事件传播器。
我们来看InitApplicationEventMulticaster:
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isDebugEnabled()) {
logger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isDebugEnabled()) {
logger.debug("Unable to locate ApplicationEventMulticaster with name '" +
APPLICATION_EVENT_MULTICASTER_BEAN_NAME +
"': using default [" + this.applicationEventMulticaster + "]");
}
}
}
做的内容就是一件事,如果容器中没有name=“applicationEventMulticaster”的类,则认为没有自定义的ApplicationEventMulticaster,此时会自己new一个SimpleApplicationEventMulticaster。
那我们来看看这个默认的广播器的实现SimpleApplicationEventMulticaster,其中核心的方法是multicastEvent,从字面来看是多路广播事件的意思:
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass().getName())) {
// Possibly a lambda-defined listener which we could not resolve the generic event type for
// -> let's suppress the exception and just log a debug message.
Log logger = LogFactory.getLog(getClass());
if (logger.isDebugEnabled()) {
logger.debug("Non-matching event type for listener: " + listener, ex);
}
}
else {
throw ex;
}
}
}
可见multicastEvent是获取对应的Event的listener,并进行调用的,也就是listener.onApplicationEvent(event)。在这里面有点需要注意的是在进行调用的时候,我们会先去获得Executor executor = getTaskExecutor();但是如果我们使用的是默认的Executor的话,则实际executor = null,则此时唤醒的是同步的,如果我们希望采用多线程的方式,则实际需要配置自己事件传播器,并通过setter的方式将executor配置进去。
而关于SimpleApplicationEventMulticaster是如何将Listener配置进去的,下面的代码显示了细节:
public abstract class AbstractApplicationEventMulticaster
implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {
final Map<ListenerCacheKey, ListenerRetriever> retrieverCache = new ConcurrentHashMap<>(64);
...
private class ListenerRetriever {
public final Set<ApplicationListener<?>> applicationListeners;
public final Set<String> applicationListenerBeans;
private final boolean preFiltered;
public ListenerRetriever(boolean preFiltered) {
this.applicationListeners = new LinkedHashSet<>();
this.applicationListenerBeans = new LinkedHashSet<>();
this.preFiltered = preFiltered;
}
public Collection<ApplicationListener<?>> getApplicationListeners() {
LinkedList<ApplicationListener<?>> allListeners = new LinkedList<>();
for (ApplicationListener<?> listener : this.applicationListeners) {
allListeners.add(listener);
}
if (!this.applicationListenerBeans.isEmpty()) {
BeanFactory beanFactory = getBeanFactory();
for (String listenerBeanName : this.applicationListenerBeans) {
try {
ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class);
if (this.preFiltered || !allListeners.contains(listener)) {
allListeners.add(listener);
}
}
catch (NoSuchBeanDefinitionException ex) {
// Singleton listener instance (without backing bean definition) disappeared -
// probably in the middle of the destruction phase
}
}
}
AnnotationAwareOrderComparator.sort(allListeners);
return allListeners;
}
}
...
}
在AbstractApplicationEventMulticaster中定义了一个内部类ListenerRetriever,实际ListenerRetriever对应的是同一类ApplicationEvent的事件监听器,而通过了一个Map当做缓存来取得相应的Listener。
每次获取的时候,会根据EventType和SourceType来生成相应的ListenerCacheKey,从而获得相应的监听器。
5. 定义有序的监听器:
如果我们希望监听器有序的话,实际只要实现SmartApplicationListener接口即可。
6. 参考文章:
详解Spring事件驱动模型
Spring源码