Redis做消息队列

2022-12-06  本文已影响0人  轻轻敲醒沉睡的心灵

消息队列(Message Queue)是一种应用间的通信方式,是为了方便业务场景中数据的传输和处理。

1. 最简单的:使用 List队列

当业务需求比较简单的时候,可以使用List来实现消息队列。
因为 List 底层的实现就是一个链表,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。
Redis中也提供了方法来操作数据,可以左进右出,右进左出

127.0.0.1:6379> lpush testList msg1
1
127.0.0.1:6379> lpush testList msg2
2
127.0.0.1:6379> rpop testList
msg1
127.0.0.1:6379> rpop testList
msg2
127.0.0.1:6379> rpop testList

127.0.0.1:6379>

写代码时,一般会用一个死循环,不停的去拉取。
但此时会有一个问题,当队列为空时,消费者依旧会频繁拉取消息,这会造成CPU 空转,不仅浪费 CPU 资源,还会对 Redis 造成压力。
当然我们可以不用死循环,用个定时任务来拉取数据,但当有连续大量数据进来时,就不能连续消费了,而是要等定时任务才能消费。
其实Redis也给出了其他方法来处理CPU 空转的情况:BRPOP / BLPOP,这里的 B 指的是阻塞(Block),即阻塞式拉取消息。实现原理是:如果队列为空,消费者在拉取消息时就阻塞等待,一旦有新消息过来,就通知消费者立即处理新消息。使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个超时时间,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL。设置了超时时间,可以很好的解决Redis连接超时的问题。
这时用List实现消息队列是这样的:

while (true) {
    try {
        Object object = redisTemplate.opsForList().rightPop("test", Duration.ofSeconds(60));
        if (object == null) {
            // ThreadUtil.sleep(60 * 1000);
            continue;
        }
        handle(object);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

此时,redis的配置中, 连接超时时间timeout: 100000(毫秒) 要大于上面 一个循环的时间,否则有可能报错。
这就是最简单的消息队列模型,简单业务中可以使用,毕竟比搭建专门的消息队列省了很多事情和成本。
List队列模型中,虽然生产者的消息可以持久化的,但还存在着2个问题:

2. 发布/订阅模型:Pub/Sub

发布订阅模型,可以满足多个消费者的条件,流程示意图如下(来源于网络):


image.png

Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。但是需要注意:

  1. 消费者订阅指定队列,Redis 就会记录一个映射关系:队列->消费者。同时Redis会在 Server 上给这个消费者在分配一个缓冲区,这个缓冲区其实就是一块内存,缓冲区大小可调节;
  2. 生产者向这个队列发布消息,Redis会先把消息写到对应消费者的缓冲区中,消费者在不断地从缓冲区读取消息,处理消息
    因为有了这个缓冲区,才会发生消息堆积。当消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。如果超过了缓冲区配置的上限,此时,Redis 就会强制把这个消费者踢下线。这时消费者就会消费失败,也会丢失数据。
    Redis 的配置文件中,这个缓冲区的默认配置:client-output-buffer-limit pubsub 32mb 8mb 60
    它的参数含义如下:

可以看出,Redis的消费/订阅模式,除了支持多消费者和多生产者以外,其他的还不如List模型呢,不能持久化、消息同样丢失、还多了消息堆积

3. Redis中的Stream

Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中
我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。
Stream是专门为消息队列增加的,它也有命令来实现消息队列的功能。

发布

// 使用 XADD 命令发布消息,其中的*表示让 Redis 自动生成唯一的消息 ID
// 这个消息 ID 的格式是时间戳-自增序号
127.0.0.1:6379> XADD testStr * name zhangsan
1670255186379-0
127.0.0.1:6379> XADD testStr * name lisi
1670255207117-0
127.0.0.1:6379>

消费拉取

// 从开头读取5条消息,0-0表示从开头读取
127.0.0.1:6379> XREAD COUNT 5 STREAMS testStr 0-0
testStr
1670255186379-0
name
zhangsan
1670255207117-0
name
lisi
// 继续拉取消息,需要传入上一条消息的 ID:
127.0.0.1:6379> xread count 2 streams testStr 1670255207117-0

127.0.0.1:6379>

阻塞拉取消费
Stream 可以 实现阻塞拉取,添加BLOCK参数即可。

// BLOCK 0 表示阻塞等待,不设置超时时间
127.0.0.1:6379> xread count 2 BLOCK 0 streams testStr 1670255207117-0

// 创建消费者组1,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE testStr group1 0-0
OK
// 创建消费者组2,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE testStr group2 0-0
OK
127.0.0.1:6379>
// group1的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS testStr >
testStr
1670255186379-0
name
zhangsan
1670255207117-0
name
lisi
// group2的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS testStr >
testStr
1670255186379-0
name
zhangsan
1670255207117-0
name
lisi
127.0.0.1:6379>

Stream做消息队列数据固化了,从生产者和组件本身来说不会丢失了,同时它还有其他的设计,弥补了ListRedis发布/订阅的缺点。

  1. Stream不会丢失数据了,因为像其他消息队列一样,添加了ACK机制,每个消息都有自己的ID,当消费者处理完数据以后,会执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为处理完成。如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。
  2. 消息堆积的处理,当消息队列发生消息堆积时,一般只有 2 个解决方案:
    • 生产者限流:避免消费者处理不及时,导致持续积压
    • 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息
      而 Redis 在实现 Stream 时,采用了第 2 个方案。
      在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。
// 队列长度最大10000
127.0.0.1:6379> XADD str2 MAXLEN 10000 * name zhangsan
1670257154826-0
127.0.0.1:6379>

当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。
这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。

  1. Stream 还支持查看消息长度XLEN、查看消费者状态XINFO等命令,使用比较简单,可以查询官方文档了解一下

Redis在做的越来越好,但是就Redis本身的设计来说,比如AOF持久化机制,使Redis组件本身还存在丢数据的可能,消息堆积时,消费者也可能丢失消息。因此Redis和专业消息队列还是有差别的。
但是,Redis是轻量化的组件,作为缓存存储项目中经常用到,此时在业务需求简单且允许的情况下拿它来做消息队列还是很不错的。

上一篇 下一篇

猜你喜欢

热点阅读