如何使用消息队列发布与订阅【易扩展】

2019-06-27  本文已影响0人  逸如风飞

redis消息队列实现

定义消息接受注解

@Component
@Retention(value = RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RedisTopic {
    String value();
}

定义消息接受interface

public interface RedisTopicInterface<T> {
    public final static String PREX = "houdamis_";
    /**
     * 发布消息
     * @return
     */
    public String getChannel();
    /**
     * 接受消息
     * @return
     */
    public boolean receiveMsg(T message,String channel);
    /**
     * 接受消息
     * @return
     */
    public boolean isReceiver( String channel);

}

消息接受代码示例

@RedisTopic(value = TOPIC)
public class DataSourceReloadService  implements RedisTopicInterface {
    /**
        必须参数
     * 定义消息队列主题
     */
    public final static String TOPIC = RedisTopicInterface.PREX + "dbsync";

    public   final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public String getChannel( ) {
        return TOPIC;
    }

    /**
     * 接受消息
     *
     * @param message 消息内容
     * @param channel 消息主题 同topic
     * @return
     */
    @Override
    public boolean receiveMsg(Object message, String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            try {
//               TODO 进行业务处理
                xxxx
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("{} 接受消息失败:{}",TOPIC ,e.getMessage());
            }
        }
        return false;
    }

    /**
     * 接受消息
     *
     * @param channel
     * @return
     */
    @Override
    public boolean isReceiver(String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            return true;
        }
        return false;
    }
}

消息队列工具类

public class RedisTopicUtils {
    private static RedisTemplate redisTemplate = SpringContextHolder.getBean("redisTemplate");
    private static ConcurrentHashMap<String , Set<RedisTopicInterface>> topicBeans = new ConcurrentHashMap<String, Set<RedisTopicInterface>>();

    /**
     * 发布消息
     * @param channel
     * @param messgae
     * @return
     */
    public static boolean sendMessage(String channel ,Object messgae){
        redisTemplate.convertAndSend(channel, messgae);
        return true;
    }

    /**
     * 接受消息
     * @param channel
     * @param messgae
     */
    public static void receiveMessage(String channel, Object messgae) {
        Set<RedisTopicInterface> beans = getReceiver(channel);
        for (RedisTopicInterface v : beans) {
            try {
                v.receiveMsg(messgae,channel);
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 注册RedisTopicInterface
     * @param receiver
     */
    public static void regist(RedisTopicInterface receiver){
        String key = receiver.getClass().getAnnotation(RedisTopic.class).value();
        Set<RedisTopicInterface>  beans = getReceiver(key);
        beans.add(receiver);
        topicBeans.put(key,beans);
    }

    public static Set<RedisTopicInterface> getReceiver(String key){
        Set<RedisTopicInterface>  beans = topicBeans.get(key);
        if(beans == null){
            beans = Sets.newHashSet();
        }
        return beans;
    }

    /**
     * 初始化
     */
    public static void init(){
        try {
            Map<String, Object> serviceBeanMap = SpringContextHolder.getApplicationContext().getBeansWithAnnotation(RedisTopic.class);
            if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    String interfaceName = serviceBean.getClass().getAnnotation(RedisTopic.class).value();
                    if(null != interfaceName){
                         regist( (RedisTopicInterface) serviceBean);
                        System.out.println(serviceBean.getClass().getName()+ " regist to RedisTopic[" + interfaceName+"]");
                    }else{
                        System.err.println(serviceBean.getClass().getName()+ "can't regist to RedisTopic[" + interfaceName+"]");
                    }
                }
            }
        } catch ( Throwable e) {
            e.printStackTrace();
        }
    }


}

消息队列-接受者注册

系统启动时进行注册

@Service
public class DataSourceInitListener implements ApplicationListener<ContextRefreshedEvent> {

    @Override
    public void onApplicationEvent(ContextRefreshedEvent evt) {
        
        RedisTopicUtils.init();
    }
}

redis xml配置

<!-- 定义监听类 -->
    <bean id="redisMessageListener" class="com.thinkgem.jeesite.common.redis.topic.RedisMessageListener">
        <property name="redisTemplate" ref="redisTemplate"/>
    </bean>
    <!-- 定义监听容器 -->
    <bean id="redisMessageListenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"
          destroy-method="destroy">
        <property name="connectionFactory" ref="jedisConnectionFactory"/>
        <!-- 任务执行器 -->
        <property name="taskExecutor">
            <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
                <property name="poolSize" value="10"/>
            </bean>
        </property>
        <!-- 消息监听器 -->
        <property name="messageListeners">
            <map>
                <entry key-ref="redisMessageListener">
                    <list>
                        <bean class="org.springframework.data.redis.listener.PatternTopic">
                            <constructor-arg value="*" />
                        </bean>
                    </list>
                </entry>
            </map>
        </property>
    </bean>

消息监听

RedisMessageListener监听到消息之后,交由RedisTopicUtils处理,RedisTopicUtils根据topic找到已注册的Set<RedisTopicInterface>,然后通知每个RedisTopicInterface元素进行处理。


/**
 * redis消息接受
 */
public class RedisMessageListener implements MessageListener {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
        String msgChannel = (String) getRedisTemplate().getKeySerializer().deserialize(channel);
        RedisTopicUtils.receiveMessage(msgChannel,getRedisTemplate().getValueSerializer().deserialize(body));
    }
    public RedisTemplate<String, Object> getRedisTemplate() {
        return redisTemplate;
    }
    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    
}

消息发布

//        发布 消息
        RedisTopicUtils.sendMessage(DataSourceReloadService.TOPIC, “这是xx消息”;

如何使用消息队列发布与订阅

订阅消息

新增代码 实现RedisTopicInterface,添加@RedisTopic注解,并设置TOPIC对应的主题,实现3个方法(有2个直接使用示例代码即可)
主要实现receiveMsg()方法。

@RedisTopic(value = TOPIC)
public class AAAService  implements RedisTopicInterface {
/** * 定义消息队列主题 */
public final static String TOPIC = RedisTopicInterface.PREX + "dbsync";
    /**
     * 接受消息
     * @param message
     * @param channel
     * @return
     */
    @Override
    public boolean receiveMsg(Object message, String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            try {
//             TODO   进行业务处理
               此处实现
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("{} 接受消息失败:{}",TOPIC ,e.getMessage());
            }
        }
        return false;
    }
     @Override
    public String getChannel( ) {
        return TOPIC;
    }

    /**
     * 接受消息
     *
     * @param channel
     * @return
     */
    @Override
    public boolean isReceiver(String channel) {
        if(StringUtils.isNotEmpty(channel) && TOPIC.equals(channel)){
            return true;
        }
        return false;
    }

发布消息

//        发布 消息
       RedisTopicUtils.sendMessage(AAAService.TOPIC, “这是xx消息”;
上一篇下一篇

猜你喜欢

热点阅读