Spring ApplicationContext事件机制及使用

2019-03-26  本文已影响0人  十毛tenmao

Spring中提供的标准事件:

注册到ZooKeeper

@Slf4j
@Component
public class RegisterZkListener implements ApplicationListener<ContextRefreshedEvent> {
    private ZkRegister zr;
    @Value("${zk.ip:}")
    public String zkHost;
    @Value("${register.zk.name}")
    public String registerZkName;
    @Value("${register.zk.port:8080}")
    public int registerZkPort;


    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        log.debug("onApplicationEvent is start");
        if (zr == null) {
            zr = new ZkRegister(zkHost);
        }
        zr.register(registerZkName, registerZkPort);
    }

}

缓存同步(自定义事件)

@Getter
public class CacheChangedEvent extends ApplicationEvent {
    private final Set<Integer>  blogIds;
    public CacheChangedEvent(Object source, Set<Integer> blogIds) {
        super(source);
        this.blogIds = blogIds;
    }
}
public class TenmaoFlowManager implements DistCacheObserver, ApplicationListener<CacheChangedEvent> {
    enum CacheState {
        INITIALIZING,
        BROADCASTING
    }

    private volatile CacheState cacheState = CacheState.INITIALIZING;

    @Override
    public void onApplicationEvent(CacheChangedEvent event) {
        //使用两次校验,提高性能
        if (cacheState == CacheState.INITIALIZING) {
            synchronized (this) {
                if (cacheState == CacheState.INITIALIZING) {
                    log.info("add to cache events: {}", event.getblogIds());
                    lastEvent = event.getblogIds();
                    return;
                }
            }
        }
        doHandle(event.getblogIds());
    }
    private void doHandle(Set<Integer> blogIds) {
        //todo
    }
}
public class DistCacheManager {
    private static final String CACHE_CHANNEL = "channel.blog.tenmao.cache";
    private static final String CACHE_KEY = "blog.tenmao.cache.tasks";

    static {
        ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("dist cache %d").build();
        THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(2, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100), factory);
    }

    @Resource
    private ApplicationContext applicationContext;
    private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR;

    @Resource
    private JedisCluster jedis;

    private final JedisPubSub jedisPubSub = new JedisPubSub() {
        @Override
        public void onMessage(String channel, String message) {
            super.onMessage(channel, message);
            Set<Integer> blogIds = jedis.smembers(Tenmao_CACHE_KEY).stream().map(Integer::parseInt).collect(Collectors.toSet());
            log.info("get message: channel[{}], message[{}], blogIds[{}]", channel, message, blogIds);
            synchronized (this) {
                applicationContext.publishEvent(new CacheChangedEvent(this, blogIds));
            }
        }
    };
}

参考

上一篇下一篇

猜你喜欢

热点阅读