Spring ApplicationContext事件机制及使用
Spring中提供的标准事件:
-
ContextRefreshEvent,当ApplicationContext容器初始化完成或者被刷新的时候,就会发布该事件。比如调用ConfigurableApplicationContext接口中的refresh()方法。此处的容器初始化指的是所有的Bean都被成功装载,后处理(post-processor)Bean被检测到并且激活,所有单例Bean都被预实例化,ApplicationContext容器已经可以使用。只要上下文没有被关闭,刷新可以被多次触发。XMLWebApplicationContext支持热刷新,GenericApplicationContext不支持热刷新。
-
ContextStartedEvent,当ApplicationContext启动的时候发布事件,即调用ConfigurableApplicationContext接口的start方法的时候。这里的启动是指,所有的被容器管理生命周期的Bean接受到一个明确的启动信号。在经常需要停止后重新启动的场合比较适用。
-
ContextStoppedEvent,当ApplicationContext容器停止的时候发布事件,即调用ConfigurableApplicationContext的close方法的时候。这里的停止是指,所有被容器管理生命周期的Bean接到一个明确的停止信号。
-
ContextClosedEvent,当ApplicationContext关闭的时候发布事件,即调用ConfigurableApplicationContext的close方法的时候,关闭指的是所有的单例Bean都被销毁。关闭上下后,不能重新刷新或者重新启动。
-
RequestHandledEvent,只能用于DispatcherServlet的web应用,Spring处理用户请求结束后,系统会触发该事件。
注册到ZooKeeper
@Slf4j
@Component
public class RegisterZkListener implements ApplicationListener<ContextRefreshedEvent> {
private ZkRegister zr;
@Value("${zk.ip:}")
public String zkHost;
@Value("${register.zk.name}")
public String registerZkName;
@Value("${register.zk.port:8080}")
public int registerZkPort;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
log.debug("onApplicationEvent is start");
if (zr == null) {
zr = new ZkRegister(zkHost);
}
zr.register(registerZkName, registerZkPort);
}
}
缓存同步(自定义事件)
- 自定义事件类
CacheChangedEvent.java
@Getter
public class CacheChangedEvent extends ApplicationEvent {
private final Set<Integer> blogIds;
public CacheChangedEvent(Object source, Set<Integer> blogIds) {
super(source);
this.blogIds = blogIds;
}
}
- 事件使用
TenmaoFlowManager.java
public class TenmaoFlowManager implements DistCacheObserver, ApplicationListener<CacheChangedEvent> {
enum CacheState {
INITIALIZING,
BROADCASTING
}
private volatile CacheState cacheState = CacheState.INITIALIZING;
@Override
public void onApplicationEvent(CacheChangedEvent event) {
//使用两次校验,提高性能
if (cacheState == CacheState.INITIALIZING) {
synchronized (this) {
if (cacheState == CacheState.INITIALIZING) {
log.info("add to cache events: {}", event.getblogIds());
lastEvent = event.getblogIds();
return;
}
}
}
doHandle(event.getblogIds());
}
private void doHandle(Set<Integer> blogIds) {
//todo
}
}
- 事件发布
DistCacheManager.java
public class DistCacheManager {
private static final String CACHE_CHANNEL = "channel.blog.tenmao.cache";
private static final String CACHE_KEY = "blog.tenmao.cache.tasks";
static {
ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("dist cache %d").build();
THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(2, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100), factory);
}
@Resource
private ApplicationContext applicationContext;
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR;
@Resource
private JedisCluster jedis;
private final JedisPubSub jedisPubSub = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
super.onMessage(channel, message);
Set<Integer> blogIds = jedis.smembers(Tenmao_CACHE_KEY).stream().map(Integer::parseInt).collect(Collectors.toSet());
log.info("get message: channel[{}], message[{}], blogIds[{}]", channel, message, blogIds);
synchronized (this) {
applicationContext.publishEvent(new CacheChangedEvent(this, blogIds));
}
}
};
}