Redis(五)-特性-消息队列
2020-11-29 本文已影响0人
进击的蚂蚁zzzliu
概述
消息队列要能支持组件通信消息的快速读写,而Redis本身支持数据的高速访问,正好可以满足消息队列的读写性能需求。另外,消息队列在存取消息时,必须要满足三个需求:
-
消息有序性
:保证先进入队列的消息先消费; -
消息唯一性
:每条消息有一个唯一的标识,消费者可以根据标识来避免该消息被重复消费; -
消息可靠性
:消费者遇到宕机或重启等情况时消息不能丢失;
针对消息队列的需求,本节就来分析下Redis实现消息队列的方案
1. List实现
1.1 命令
BLPOP
:队列为空时阻塞
LPUSH
:队列满时阻塞
BRPOPLPUSH
:取出消费同时保存到另外一个备份list
1.2 实现
消息队列-list-2.png从消息有序性、唯一性、可靠性三个方面分析是否可行
-
消息有序性
:List本身就是按先进先出的顺序对数据进行存取的,已经能满足有序性; -
消息唯一性
:唯一性无法保证,但是可以很容易实现,例如:producer给消息增加唯一前缀/后缀(LPUSH mq 20201129155328:1),consumer消费时截取出唯一标识(20201129155328)判断是否重复消费; -
消息可靠性
:当consumer取出一条消息尚未消费时,consumer发生宕机或其他故障,此时消息将会丢失,因此不满足可靠性;
为了解决可靠性问题可以使用BRPOPLPUSH
消息队列-list-1.png
当consumer故障恢复后可以从备份队列中取出消息进行处理
2. Stream实现
Streams是Redis专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令;
2.1 命令
XADD
:插入消息,消息的格式是键-值对形式,保证有序,可以自动生成全局唯一ID;
XREAD
:用于读取消息,可以按ID读取数据;
XREADGROUP
:按消费组形式读取消息;
XPENDING
:用来查询每个消费组内所有消费者已读取但尚未确认的消息
XACK
:用于向消息队列确认消息处理已完成
2.2 实现
消息队列-stream.png1. XADD demo * key1 5
:返回1606639485099-0,生成唯一id(前部分精确为毫秒数,后部分为该毫秒内序号)2. XREAD block 10000 streams demo 1606639485099-0
:取出1606639485099-0序号以后的所有消息,最长阻塞10000ms3. XGROUP create demo group1 0
:创建一个名为group1的消费组,这个消费组消费的消息队列是demo4. XADD demo * key1 5
:往demo中增加4个k-v5.6.7. XREADGROUP group group1 consumer1 count 1 streams demo >
:让group1中的consumer1、2、3各自读取一条消息其他命令:
XREAD block 10000 streams stream $
:取出最新一条消息,最长阻塞10000msXREADGROUP group group1 consumer1 streams stream >
:让group1消费组里的消费者consumer1从mqstream中读取所有消息,其中,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取XPENDING stream group1
:查看group1中各个消费者已读取、但尚未确认的消息个数XACK stream group1 1604471086842-0
:group1确认消息1604471086842-0已经消费,这条消息就会被group1删除
2.3 三大需求分析
-
消息有序性
:XADD保证消息有序; -
消息唯一性
:每条消息返回唯一id,保证消息唯一; -
消息可靠性
:Streams会自动使用内部队列(也称为PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用XACK命令通知Streams“消息已经处理完成”。如果消费者没有成功处理消息,它就不会给Streams发送XACK命令,消息仍然会留存。此时,消费者可以在重启后,用XPENDING命令查看已读取、但尚未确认处理完成的消息。
小结
list和streams对比--------over---------