Java架构技术栈

技术转载——手把手教你用redis实现一个简单的mq消息队列

2020-06-09  本文已影响0人  若丨寒

写在前面:2020年面试必备的Java后端进阶面试题总结了一份复习指南在Github上,内容详细,图文并茂,有需要学习的朋友可以Star一下!
GitHub地址:https://github.com/abel-max/Java-Study-Note/tree/master

众所周知,消息队列是应用系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ.

但是如果你不想为你的系统引入一个重量级(相对 redis 来说)的 mq,但是想要享受解耦、异步消息等特性,通过本文你就 get 到了,通过 redis 实现一个简单版的 mq。

为什么是 redis

如何实现

利用 redis 的队列结构来实现消息队列。redis 单个队列最多支持 2*32-1 条数据,对于大部分应用是完全够用的。

简单来说就是:

注意:代码仅供个人尝鲜使用,请勿用于真实生产环境

代码仅可在 springboot 环境中使用

首先定义注解和接口类

注解代码如下:

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Componentpublic @interface MqConsumer {    /**     * 队列主题     */    String topic() default "default_es_topic";}

被该注解修饰的类,将会接收 topic 下的消息。

接口代码如下:

public interface RedisConsumer {    /**     * 功能描述: 消费方法,消费者类必须继承此方法     *     * @param message 数据载体     * @author 123     * @date 2020/3/28 22:41     */    void deal(String message);}

本接口用于定于接受消息的处理方法。

扫描注解修饰类

本部分为核心代码,首先需要获取代码中被注解修饰的类,然后建立一个循环从 redis 队列中取数据,最后调用类对象的 deal 方法消费消息,如果 deal 方法抛出错误,认为消费失败,重新将该数据放入队列中。

  1. 扫描部分代码如下:
/** *  MqConfiguration.java */@Overridepublic void run(ApplicationArguments args) {    Map<String, Object> map = context.getBeansWithAnnotation(MqConsumer.class);    map.values().forEach(item -> {        if (!(item instanceof RedisConsumer)) {            log.warn("注意检测到被@EsConsumer注解的类{}未实现RedisConsumer接口", item.getClass().getCanonicalName());            return;        }        MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class);        MqConsumer annotation = annotations[0];        String topic = annotation.topic();        if (topicMap.containsKey(topic)) {            log.error("多个消费者{},消费同一个消息:{},已忽略", item.getClass().getCanonicalName(), topic);        } else {            topicMap.put(topic, (RedisConsumer) item);        }    });    log.info("redis订阅信息汇总完毕!!!!!!");    //由一个线程始终循环获取es队列数据    threadPoolExecutor.execute(loop());}

run 方法在 spring 扫描完毕后调用,通过实现ApplicationRunner接口实现,通过 spring 的方法来获取所有被MqConsumer接口注解的类(否则需要自己写类加载器)。数据汇总完毕后使用一个线程来进行无线循环从 redis 队列中取数据。

  1. 执行线程部分代码如下:
private Runnable loop() {    return () -> {        while (true) {            AtomicInteger count = new AtomicInteger(0);            topicMap.forEach((k, v) -> {                try {                    String message = mqUtil.getRedisTemplate().opsForList().rightPop(k);                    if (message == null) {                        count.getAndIncrement();                    } else {                        pushTask(v, message, k);                    }                } catch (RedisConnectionFailureException connException) {                    log.error("redis无法连接,10s后重试", connException);                    sleep(10);                } catch (Exception e) {                    log.error("redis消息队列异常", e);                }            });            if (count.get() == topicMap.keySet().size()) {                //当所有的队列都为空时休眠1s                sleep(1);            }        }    };}private void pushTask(RedisConsumer item, String value, String key) {    threadPoolExecutor.execute(() -> {        try {            item.deal(value);        } catch (Exception e) {            log.error("执行消费任务出错", e);            //非广播消息进行数据回补            mqUtil.getRedisTemplate().opsForList().rightPush(key, value);        }    });}

loop 方法无限循环根据 topic 从 redis 中取数据,如果取到数据,调用 pushTask 方法执行,如果执行报错将会进行数据回补。

作者:FleyX
链接: https://juejin.im/post/5edf22e3f265da76b93bb1c4

上一篇下一篇

猜你喜欢

热点阅读