如何使用消息队列发布与订阅【易扩展】
2019-06-27 本文已影响0人
逸如风飞
redis消息队列实现
定义消息接受注解
@Component
@Retention(value = RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RedisTopic {
String value();
}
定义消息接受interface
public interface RedisTopicInterface<T> {
public final static String PREX = "houdamis_";
/**
* 发布消息
* @return
*/
public String getChannel();
/**
* 接受消息
* @return
*/
public boolean receiveMsg(T message,String channel);
/**
* 接受消息
* @return
*/
public boolean isReceiver( String channel);
}
消息接受代码示例
@RedisTopic(value = TOPIC)
public class DataSourceReloadService implements RedisTopicInterface {
/**
必须参数
* 定义消息队列主题
*/
public final static String TOPIC = RedisTopicInterface.PREX + "dbsync";
public final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String getChannel( ) {
return TOPIC;
}
/**
* 接受消息
*
* @param message 消息内容
* @param channel 消息主题 同topic
* @return
*/
@Override
public boolean receiveMsg(Object message, String channel) {
if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
try {
// TODO 进行业务处理
xxxx
} catch (Exception e) {
e.printStackTrace();
logger.error("{} 接受消息失败:{}",TOPIC ,e.getMessage());
}
}
return false;
}
/**
* 接受消息
*
* @param channel
* @return
*/
@Override
public boolean isReceiver(String channel) {
if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
return true;
}
return false;
}
}
消息队列工具类
public class RedisTopicUtils {
private static RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate");
private static ConcurrentHashMap<String , Set<RedisTopicInterface>> topicBeans = new ConcurrentHashMap<String, Set<RedisTopicInterface>>();
/**
* 发布消息
* @param channel
* @param messgae
* @return
*/
public static boolean sendMessage(String channel ,Object messgae){
redisTemplate.convertAndSend(channel, messgae);
return true;
}
/**
* 接受消息
* @param channel
* @param messgae
*/
public static void receiveMessage(String channel, Object messgae) {
Set<RedisTopicInterface> beans = getReceiver(channel);
for (RedisTopicInterface v : beans) {
try {
v.receiveMsg(messgae,channel);
} catch (Throwable e) {
e.printStackTrace();
}
}
}
/**
* 注册RedisTopicInterface
* @param receiver
*/
public static void regist(RedisTopicInterface receiver){
String key = receiver.getClass().getAnnotation(RedisTopic.class).value();
Set<RedisTopicInterface> beans = getReceiver(key);
beans.add(receiver);
topicBeans.put(key,beans);
}
public static Set<RedisTopicInterface> getReceiver(String key){
Set<RedisTopicInterface> beans = topicBeans.get(key);
if(beans == null){
beans = Sets.newHashSet();
}
return beans;
}
/**
* 初始化
*/
public static void init(){
try {
Map<String, Object> serviceBeanMap = SpringContextHolder.getApplicationContext().getBeansWithAnnotation(RedisTopic.class);
if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
for (Object serviceBean : serviceBeanMap.values()) {
String interfaceName = serviceBean.getClass().getAnnotation(RedisTopic.class).value();
if(null != interfaceName){
regist( (RedisTopicInterface) serviceBean);
System.out.println(serviceBean.getClass().getName()+ " regist to RedisTopic[" + interfaceName+"]");
}else{
System.err.println(serviceBean.getClass().getName()+ "can't regist to RedisTopic[" + interfaceName+"]");
}
}
}
} catch ( Throwable e) {
e.printStackTrace();
}
}
}
消息队列-接受者注册
系统启动时进行注册
@Service
public class DataSourceInitListener implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent evt) {
RedisTopicUtils.init();
}
}
redis xml配置
<!-- 定义监听类 -->
<bean id="redisMessageListener" class="com.thinkgem.jeesite.common.redis.topic.RedisMessageListener">
<property name="redisTemplate" ref="redisTemplate"/>
</bean>
<!-- 定义监听容器 -->
<bean id="redisMessageListenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"
destroy-method="destroy">
<property name="connectionFactory" ref="jedisConnectionFactory"/>
<!-- 任务执行器 -->
<property name="taskExecutor">
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="poolSize" value="10"/>
</bean>
</property>
<!-- 消息监听器 -->
<property name="messageListeners">
<map>
<entry key-ref="redisMessageListener">
<list>
<bean class="org.springframework.data.redis.listener.PatternTopic">
<constructor-arg value="*" />
</bean>
</list>
</entry>
</map>
</property>
</bean>
消息监听
RedisMessageListener监听到消息之后,交由RedisTopicUtils处理,RedisTopicUtils根据topic找到已注册的Set<RedisTopicInterface>,然后通知每个RedisTopicInterface元素进行处理。
/**
* redis消息接受
*/
public class RedisMessageListener implements MessageListener {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] body = message.getBody();
byte[] channel = message.getChannel();
String msgChannel = (String) getRedisTemplate().getKeySerializer().deserialize(channel);
RedisTopicUtils.receiveMessage(msgChannel,getRedisTemplate().getValueSerializer().deserialize(body));
}
public RedisTemplate<String, Object> getRedisTemplate() {
return redisTemplate;
}
public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
}
消息发布
// 发布 消息
RedisTopicUtils.sendMessage(DataSourceReloadService.TOPIC, “这是xx消息”;
如何使用消息队列发布与订阅
订阅消息
新增代码 实现RedisTopicInterface,添加@RedisTopic注解,并设置TOPIC对应的主题,实现3个方法(有2个直接使用示例代码即可)
主要实现receiveMsg()方法。
@RedisTopic(value = TOPIC)
public class AAAService implements RedisTopicInterface {
/** * 定义消息队列主题 */
public final static String TOPIC = RedisTopicInterface.PREX + "dbsync";
/**
* 接受消息
* @param message
* @param channel
* @return
*/
@Override
public boolean receiveMsg(Object message, String channel) {
if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
try {
// TODO 进行业务处理
此处实现
} catch (Exception e) {
e.printStackTrace();
logger.error("{} 接受消息失败:{}",TOPIC ,e.getMessage());
}
}
return false;
}
@Override
public String getChannel( ) {
return TOPIC;
}
/**
* 接受消息
*
* @param channel
* @return
*/
@Override
public boolean isReceiver(String channel) {
if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
return true;
}
return false;
}
发布消息
// 发布 消息
RedisTopicUtils.sendMessage(AAAService.TOPIC, “这是xx消息”;