Redis 面试宝典之 如何保证 Redis 消息队列中的数据不

2022-01-14  本文已影响0人  you的日常

Redis 最常见的业务场景就是缓存读取与存储,而随着时间的推移,有人开始将它作为消息队列来使用了,并且随着 Redis 版本的发展,在 Redis.2.0.0 中新增了发布订阅模式(Pub/Sub)代表着官方开始正式支持消息队列的功能了,直到今天为止还有部分公司在实现轻量级的消息队列时,依然会选择使用 Redis 来实现。并且消息队列的知识点也会作为一个进阶型的面试题经常出现在面试当中。

我们本文的面试题是,什么是消息队列?为什么要用消息队列?Redis 实现消息队列的方式有几种?如何保证 Redis 消息队列中的数据不丢失?

典型回答

消息队列(Message Queue)是一种进程间通信或同一进程的不同线程间的通信方式,它的实现流程是一方会将消息存储在队列中,而另一方则从队列中读取相应的消息,消息队列提供了异步的通信协议,也就是说消息的发送者和接收者无需同时与消息队列进行交互。

消息队列中有几个重要的概念:

它们的执行流程如下图所示: image.png

使用消息队列有如下好处:

Redis 实现消息队列的方式有四种:

Redis 想要保住消息队列中的数据不丢失,必须要做到以下两点:

  1. 必须开启消息的持久化功能,负责在 Redis 重启之后消息就会全部丢失;
  2. 需要使用 Stream 中提供的消息确认功能,保证消息能够被正常消费。

考点分析

消息队列本身的概念并不难懂,这个就好像买货和卖货一样,最早之前我们卖家和买家是直接面对面进行交易的,但这种方式有很多的局限,比如很难规模话,需要很大的人力成本等,那么随着科技的发展我们可以创建更多的无人超市,而这些无人超市就相当于消息队列中的 channel,卖家相当于生产者,每次只需要负责把货物存放到无人超市就行了,而买家也不用再找卖家直接买货了,只需要每次去无人超市取货就可以了,这样就可以开越来越多的无人超市,即使搞双 11、618 等活动也不怕了,因为那时候就有足够多的无人超市了,这个例子其实就可以用来很好的理解消息队列的核心思路和工作原理,他们之间的关系如下图所示: image.png

理解了消息队列的概念之后,我们重点应该关注的就是消息队列的实现方式了。

和此知识点相关的面试题还有以下这些:

消息队列的具体实现

1.List 版实现方式

我们本文将使用 Java 代码来实现相关的示例,在 Java 项目中我们会使用到 Jedis 框架来作为 Redis 的客户端来进行相关的 Redis 操作,在引用了 Jedis 之后,我们实现代码如下:

import redis.clients.jedis.Jedis;

public class ListMQExample {
    public static void main(String[] args) throws InterruptedException {
        // 消费者
        new Thread(() -> bConsumer()).start();
        // 生产者
        producer();
    }
    /**
     * 生产者
     */
    public static void producer() throws InterruptedException {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        // 推送消息
        jedis.lpush("mq", "Hello, List.");
        Thread.sleep(1000);
        jedis.lpush("mq", "message 2.");
        Thread.sleep(2000);
        jedis.lpush("mq", "message 3.");
    }
    /**
     * 消费者(阻塞版)
     */
    public static void bConsumer() {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        while (true) {
            // 阻塞读
            for (String item : jedis.brpop(0,"mq")) {
                // 读取到相关数据,进行业务处理
                System.out.println(item);
            }
        }
    }
}

以上程序的运行结果是:

接收到消息:Hello, List.

我们使用无限循环来获取队列中的数据,这样就可以实时的获取相关信息了,其中 brpop() 方法的第一个参数是设置超时时间的,设置 0 表示一直阻塞,也就是说当队列没有数据时,while 循环就会进入休眠状态,当有数据进入队列之后,它才会“苏醒”过来执行读取任务,这样就可以解决 while 循环一直执行消耗系统资源的问题了。

ZSet 的实现方式和 List 实现方式类似,它主要借助的是 zadd 和 zrangebyscore 来实现存入和读取,这里就不再进行赘述了。

2.Pub/Sub 实现方式

此模式是 Redis 提供的发布订阅模式,它的相关实现代码如下。

消费者代码如下:

/**
 * 消费者
 */
public static void consumer() {
    Jedis jedis = new Jedis("127.0.0.1", 6379);
    // 接收并处理消息
    jedis.subscribe(new JedisPubSub() {
        @Override
        public void onMessage(String channel, String message) {
            // 接收消息,业务处理
            System.out.println("频道 " + channel + " 收到消息:" + message);
        }
    }, "channel");
}

生产者代码如下:

/**
 * 生产者
 */
public static void producer() {
    Jedis jedis = new Jedis("127.0.0.1", 6379);
    // 推送消息
    jedis.publish("channel", "Hello, channel.");
}

发布者和订阅者模式运行:

public static void main(String[] args) throws InterruptedException {
    // 创建一个新线程作为消费者
    new Thread(() -> consumer()).start();
    // 暂停 0.5s 等待消费者初始化
    Thread.sleep(500);
    // 生产者发送消息
    producer();
}

以上代码运行结果如下:

频道 channel 收到消息:Hello, channel.

此模式提供了主题订阅的模式,例如我要订阅日志类的消息队列,它们的命名都是 logXXX,这个时候就需要使用 Redis 提供的另一个功能 Pattern Subscribe 主题订阅,这种方式可以使用 “*” 来匹配多个频道,如下图所示:

image.png

它的相关实现代码如下。

上一篇 下一篇

猜你喜欢

热点阅读