Redis到底适不适合做消息队列?

2022-01-18  本文已影响0人  爱钓鱼的码农

List队列模型

因为 List 底层的实现就是一个「链表」,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。
生产者使用 LPUSH 发布消息:

127.0.0.1:6379> LPUSH queue msg1
(integer) 1
127.0.0.1:6379> LPUSH queue msg2
(integer) 2

消费者这一侧,使用 RPOP 拉取消息:

127.0.0.1:6379> RPOP queue
"msg1"
127.0.0.1:6379> RPOP queue
"msg2"

但这里有个小问题,当队列中已经没有消息了,消费者在执行 RPOP 时,会返回 NULL。而我们在编写消费者逻辑时,一般是一个「死循环」,这个逻辑需要不断地从队列中拉取消息进行处理,伪代码一般会这么写:

while true:
    msg = redis.rpop("queue")
    // 没有消息,继续循环
    if msg == null:
        continue
    // 处理消息
    handle(msg)
如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成「CPU 空转」,不仅浪费 CPU 资源,还会对 Redis 造成压力。

也很简单,当队列为空时,我们可以「休眠」一会,再去尝试拉取消息。代码可以修改成这样:

while true:
    msg = redis.rpop("queue")
    // 没有消息,休眠2s
    if msg == null:
        sleep(2)
        continue
    // 处理消息        
    handle(msg)

这就解决了 CPU 空转问题。这个问题虽然解决了,但又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在「延迟」。假设设置的休眠时间是 2s,那新消息最多存在 2s 的延迟。要想缩短这个延迟,只能减小休眠的时间。但休眠时间越小,又有可能引发 CPU 空转问题。
那如何做,既能及时处理新消息,还能避免 CPU 空转呢?
Redis 是否存在这样一种机制:如果队列为空,消费者在拉取消息时就「阻塞等待」,一旦有新消息过来,就通知我的消费者立即处理新消息呢?
幸运的是,Redis 确实提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞(Block)。

注意:如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,采用这种方案,客户端要有重连机制。

List队列的缺点:

发布/订阅模型:Pub/Sub

Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。
假设你想开启 2 个消费者,同时消费同一批数据,就可以按照以下方式来实现。首先,使用 SUBSCRIBE 命令,启动 2 个消费者,并「订阅」同一个队列。

// 2个消费者 都订阅一个队列
127.0.0.1:6379> SUBSCRIBE queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1

此时,2 个消费者都会被阻塞住,等待新消息的到来。之后,再启动一个生产者,发布一条消息。这时,2 个消费者就会解除阻塞,收到生产者发来的新消息。

127.0.0.1:6379> PUBLISH queue msg1
//发送新消息
(integer) 1

127.0.0.1:6379> SUBSCRIBE queue
// 收到新消息
1) "message"
2) "queue"
3) "msg1"

使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。

// 订阅符合规则的队列
127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "queue.*"
3) (integer) 1

Pub/Sub 最大的优势就是,支持多组生产者、消费者处理消息。讲完了它的优点,那它有什么缺点呢?其实,Pub/Sub 最大问题是:丢数据。

Pub/Sub 在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立「数据转发通道」,把符合规则的数据,从一端转发到另一端。
一个完整的发布、订阅消息处理流程是这样的:
消费者订阅指定队列,Redis 就会记录一个映射关系:队列->消费者
生产者向这个队列发布消息,那 Redis 就从映射关系中找出对应的消费者,把消息转发给它,整个过程中,没有任何的数据存储,一切都是实时转发的。

Pub/Sub 在处理「消息积压」时,为什么也会丢数据?

当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。
如果采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。
但 Pub/Sub 的处理方式却不一样,当消息积压时有可能会导致消费失败和消息丢失。
每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。之后,消费者不断地从缓冲区读取消息,处理消息。
因为这个缓冲区其实是有「上限」的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。
client-output-buffer-limit pubsub 32mb 8mb 60

二者的区别

List 其实是属于「拉」模型,而 Pub/Sub 其实属于「推」模型。List 中的数据可以一直积压在内存中,消费者什么时候来「拉」都可以。但 Pub/Sub 是把消息先「推」到消费者在 Redis Server 上的缓冲区中,然后等消费者再来取。

Stream模型

在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,并给它定义了一个新的数据类型:Stream。

生产者发布 2 条消息:
使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID,这个消息 ID 的格式是「时间戳-自增序号」。
127.0.0.1:6379> XADD queue * name zhangsan
"1618469123380-0"
127.0.0.1:6379> XADD queue * name lisi
"1618469127777-0"

消费者拉取消息:
从开头读取5条消息,0-0表示从开头读取
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
1) 1) "queue"
   2) 1) 1) "1618469123380-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618469127777-0"
         2) 1) "name"
            2) "lisi"


如果想继续拉取消息,需要传入上一条消息的 ID:
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
没有消息,Redis 会返回 NULL。

Stream 是否支持「阻塞式」拉取消息?

// BLOCK 0 表示阻塞等待,不设置超时时间
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0

Stream 是否支持发布 / 订阅模式?

生产者发布 2 条消息:
127.0.0.1:6379> XADD queue * name zhangsan
"1618470740565-0"
127.0.0.1:6379> XADD queue * name lisi
"1618470743793-0"

2 组消费者处理同一批数据,就需要创建 2 个消费者组:
// 创建消费者组1,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group1 0-0
OK
// 创建消费者组2,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group2 0-0
OK

消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。

第一个消费组开始消费:
// group1的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"
第二个消费组开始消费:



// group2的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"

消息处理时异常,Stream 能否保证消息不丢失,重新消费?

除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。
当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。

// group1下的 1618472043089-0 消息已处理完成
127.0.0.1:6379> XACK queue group1 1618472043089-0

// 消费者重新上线,0-0表示重新拉取未ACK的消息
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0
// 之前没消费成功的数据,依旧可以重新消费
1) 1) "queue"
   2) 1) 1) "1618472043089-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618472045158-0"
         2) 1) "name"
            2) "lisi"

Stream 数据会写入到 RDB 和 AOF 做持久化吗?

Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。我们只需要配置好持久化策略,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。

消息堆积时,Stream 是怎么处理的?

当消息队列发生消息堆积时,一般只有 2 个解决方案:

而 Redis 在实现 Stream 时,采用了第 2 个方案。在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。

// 队列长度最大10000
127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
"1618473015018-0"

到底Redis适不适合做消息队列?

核心目标就是不允许数据丢失,消息是否会发生丢失,其重点也就在于以下 3 个环节:

生产者会不会丢消息?

当生产者在发布消息时,可能发生以下异常情况:

消费者会不会丢消息?

消费者拿到消息后,还没处理完成就异常宕机了,那消费者还能否重新消费失败的消息?
要解决这个问题,消费者在处理完消息后,必须「告知」队列中间件,队列中间件才会把标记已处理,否则仍旧把这些数据发给消费者。这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢。无论是 Redis 的 Stream,还是专业的队列中间件,例如 RabbitMQ、Kafka,其实都是这么做的。
所以,从这个角度来看,Redis 也是合格的。

队列中间件会不会丢消息?

在这个方面,Redis 其实没有达到要求。Redis 在以下 2 个场景下,都会导致数据丢失。

消息积压怎么办?

因为 Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。
所以,Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。
但 Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加「坦然」。

消息队列方案选型

综上,我们可以看到,把 Redis 当作队列来使用时,始终面临的 2 个问题:

上一篇 下一篇

猜你喜欢

热点阅读