Redis(五)-特性-消息队列

2020-11-29  本文已影响0人  进击的蚂蚁zzzliu

概述

消息队列要能支持组件通信消息的快速读写,而Redis本身支持数据的高速访问,正好可以满足消息队列的读写性能需求。另外,消息队列在存取消息时,必须要满足三个需求:

针对消息队列的需求,本节就来分析下Redis实现消息队列的方案

1. List实现

1.1 命令

BLPOP:队列为空时阻塞
LPUSH:队列满时阻塞
BRPOPLPUSH:取出消费同时保存到另外一个备份list

1.2 实现

消息队列-list-2.png

从消息有序性、唯一性、可靠性三个方面分析是否可行

为了解决可靠性问题可以使用BRPOPLPUSH


消息队列-list-1.png

当consumer故障恢复后可以从备份队列中取出消息进行处理

2. Stream实现

Streams是Redis专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令;

2.1 命令

XADD:插入消息,消息的格式是键-值对形式,保证有序,可以自动生成全局唯一ID;
XREAD:用于读取消息,可以按ID读取数据;
XREADGROUP:按消费组形式读取消息;
XPENDING:用来查询每个消费组内所有消费者已读取但尚未确认的消息
XACK:用于向消息队列确认消息处理已完成

2.2 实现

消息队列-stream.png
1. XADD demo * key1 5:返回1606639485099-0,生成唯一id(前部分精确为毫秒数,后部分为该毫秒内序号)
2. XREAD block 10000 streams demo 1606639485099-0:取出1606639485099-0序号以后的所有消息,最长阻塞10000ms
3. XGROUP create demo group1 0:创建一个名为group1的消费组,这个消费组消费的消息队列是demo
4. XADD demo * key1 5:往demo中增加4个k-v
5.6.7. XREADGROUP group group1 consumer1 count 1 streams demo >:让group1中的consumer1、2、3各自读取一条消息
其他命令:
XREAD block 10000 streams stream $:取出最新一条消息,最长阻塞10000ms
XREADGROUP group group1 consumer1 streams stream >:让group1消费组里的消费者consumer1从mqstream中读取所有消息,其中,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取
XPENDING stream group1:查看group1中各个消费者已读取、但尚未确认的消息个数
XACK stream group1 1604471086842-0:group1确认消息1604471086842-0已经消费,这条消息就会被group1删除

2.3 三大需求分析

小结

list和streams对比

--------over---------

上一篇下一篇

猜你喜欢

热点阅读