Redis做轻量级消息队列的3种玩法
Redis也可以做轻量级的消息队列:基于List的队列模式、PubSub多播的发布订阅模式、以及5.0之后提供的Stream。
特别是Stream,可以消息持久化、高可用、消息可以指定offset进行反复消费、支持发布订阅按消费组多播、使用消费者的pending_ids机制保证消息传递的可靠性等等,基本已经具有了真正的消息队列中间件的功能了。而PubSub模式在一些对消息传递可靠性和可追溯性不严格的场景(比如内网非一致性的消息通知)也有一定的使用价值。
一、快速入门
1、用list类型模拟队列
典型的队列模式,rpush
右边放入,lpop
左边取出。举例:
127.0.0.1:7001> rpush queue A
-> Redirected to slot [13011] located at 122.51.112.187:7003
(integer) 1
122.51.112.187:7003> rpush queue B
(integer) 2
122.51.112.187:7003> rpush queue C
(integer) 3
122.51.112.187:7003> lpop queue
"A"
122.51.112.187:7003> lpop queue
"B"
122.51.112.187:7003> lpop queue
"C"
122.51.112.187:7003> lpop queue
(nil)
2、PUB|SUB
PubSub解决了list做队列1个消息只能被单个消费者消费的问题,可以1个消息被多个消费组的消费组收到,即所谓的发布订阅模式。
用redis-cli先来用一下:
127.0.0.1:7001> subscribe testTopic
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "testTopic"
3) (integer) 1
返回subscript testTopic 1意思是订阅testTopic成功,然后阻塞等待消息,再开一个终端连接进行publish:
127.0.0.1:7001> publish testTopic msg1
(integer) 1
然后订阅者侧连接收到消息:
1) "message"
2) "testTopic"
3) "msg1"
message testTopic msg1,意思是收到了message,主题是testTopic,内容是msg1
可以同时订阅多个主题,
subscribe testTopic1 testTopic2 testTopic3
也可以按照模式匹配的方式来订阅主题psubscribe
,比如订阅所有以test开头的主题
psubscribe test*
PubSub的消息没有持久化,发布消息之后如果没有消费者、那么消息会直接丢弃,如果消费者刚好此时宕机、重启后也不会收到宕机时发布的消息,所以PubSub来做消息队列的场景十分有限。
Redis5.0之后提供了更强大的Redis Stream数据结构,解决了上述问题。
3、Redis Stream
发布订阅模式,一个消息发布到stream里,多个消费组订阅了这个stream的话都可以收到这个消息,且消息是持久化的。消费组在stream里用last_delivered_id
来定位偏移量、也就是消费到哪个消息了。从设计上看确实跟kafka非常像,Redis作者也说借鉴了kafka。
从数据结构上来说,stream是个链表,节点是消息,消息有ID,消息内容是一系列的k-v对。stream也有消费组和消费者的概念,每个消费者组用last_delivered_id游标指向链表中的节点、来表示消费到哪个ID的消息了,类似kafka里的offset偏移量,每个消费者组里可以有多个消费者,一个消息只会被投递给消费者组里的一个消费者,可以类比RocketMQ里的集群消息。另外,每个消费者内部还有一个 pending_ids
数组,它记录着这个消费者已经被客户端读取了的、但客户端没有回复ACK的消息,由此确保消息至少被消费1次。
添加消息和直接读取消息命令:
xadd testTopic * name douchuzi #向testTopic这个stream添加"name douchuzi"这个消息,*表示消息ID由Redis自动生成
消息ID可以由Redis自动生成,生成的ID类似"1641816045243-0",表示1641816045243这个时间戳这1ms生成的第0个消息。当然消息ID也可以由客户端自己生成。
xlen testTopic #查看有多少消息
xrange testTopic - + #从头到尾返回所有消息
xread count 1 streams testTopic 0 #从ID大于0处开始读取1个消息
xread count 1 block 0 streams testTopic $ #阻塞的从ID大于$处开始读取1个消息,$表示最后一个消息的ID
消费组命令:
xgroup create testTopic group1 $ #创建消费组group1,其last_delivered_id是$
xgroup的完整玩法:
xgroup [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
#让消费者consumer1-1从group1消费者组的testTopic stream中拿最新的、并且没有被发送给其他消费者处理的entry
xreadgroup group group1 consumer1-1 count 1 block 0 streams testTopic >
发送ACK,刚才阻塞xreadgroup之后从别的客户端xadd了1条消息,然后xreadgroup阻塞结束,消息收到。这时候我们看下consumer1-1的pending_ids:
127.0.0.1:7001> xpending testTopic group1 - + 1000 consumer1-1
1) 1) "1641819182032-0"
2) "consumer1-1"
3) (integer) 695470
4) (integer) 1
xpending查询消费组group1中消费者consumer1-1的1000条未收到客户端回复的消息。
接着我们xack回复一下,再看看pending_ids:
127.0.0.1:7001> xack testTopic group1 1641819182032-0
(integer) 1
127.0.0.1:7001> xpending testTopic group1 - + 1000 consumer1-1
(empty array)
可见,xack之后,consumer1-1的pending_ids为空了,Redis Stream用这个办法来确保消息一定被投递。
二、实战开发:基于SpringBoot开发Redis消息队列
下面具体实战一下,用SpringBoot来做Redis的消息队列开发,笔者使用的SpringBoot版本是2.3.7.RELEASE
,其默认的客户端是lettuce 5.3.5.RELEASE,测试所用的Redis为6节点Cluster,可以参照笔者的文章Redis分布式缓存搭建 - 简书 (jianshu.com)进行搭建。
1、PubSub
发布消息,比较简单,直接用RedisTemplate:
@Service
public class RedisDao {
@Autowired
RedisTemplate<String, Object> redisTemplate;
/**
* PubSub发布消息
* */
public void publishMessage(ChannelTopic channelTopic, String message) {
redisTemplate.convertAndSend(channelTopic.getTopic(), message);
}
}
订阅消息,使用Spring提供的消息容器RedisMessageListenerContainer
以及消息到达监听接口MessageListener
:
//配置PubSub消息容器
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory());
/**这里可以使用自定义注解来发现所有的MessageHandler,
* 然后循环container.addMessageListener来达到自动配置消息订阅者的目的
* 这样开发只需要编写MessageHandler的实现类就可以了
*/
MessageHandler handler = new MessageHandlerImpl();
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
handler.handleMsg(message);
}
}, handler.getChannelTopic());
return container;
}
MessageHandler接口及其实现是MessageHandlerImpl业务层代码:
/**
* PubSub订阅消息
*
* */
public interface MessageHandler {
//订阅消息到达后的逻辑处理
public void handleMsg(Message msg);
//消息的Topic
public ChannelTopic getChannelTopic();
}
@Slf4j
public class MessageHandlerImpl implements MessageHandler{
@Override
public void handleMsg(Message msg) {
try {
String msgChannel = new String(msg.getChannel(), "utf-8");
String msgBody = new String(msg.getBody(), "utf-8");
log.info("收到消息:");
log.info("Message channel : {}" , msgChannel);
log.info("Message body : {}" , msgBody);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public ChannelTopic getChannelTopic() {
return new ChannelTopic("testPubSub");
}
}
另外需要注意,Redis Cluster模式下如果客户端用的是Lettuce,需要配置客户端自适应刷新,在集群主备故障切换的时候、客户端能够自动切换到故障主节点对应的从节点去。详见笔者的文章Redis分布式缓存搭建 - 简书 (jianshu.com)。
好了,测试一下:
@Slf4j
@SpringBootApplication
public class RedismqApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(RedismqApplication.class, args);
RedisDao redisDao = context.getBean(RedisDao.class);
redisDao.publishMessage(new ChannelTopic("testPubSub"), "肥兔子爱豆畜子");
}
}
收到消息:
Message channel : testPubSub
Message body : 肥兔子爱豆畜子
2、Redis Stream
跟PubSub类似,也是需要消息容器MessageContainer、Listener这俩东西。
/**
* 发送消息到指定stream
* */
public void publishStreamMessage(String stream, Object message) {
ObjectRecord<String, String> record = StreamRecords.newRecord().ofObject(JSON.toJSONString(message)).withStreamKey(stream);
RecordId recordId = stringRedisTemplate.opsForStream().add(record);
log.info("消息已发送,消息ID:{}" , recordId.getValue());
}
消息监听,实现StreamListener
接口:
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class StreamMessageListener implements StreamListener<String, ObjectRecord<String,String>>{
private StringRedisTemplate redisTemplate;
public StreamMessageListener(StringRedisTemplate stringRedisTemplate) {
redisTemplate = stringRedisTemplate;
}
@Override
public void onMessage(ObjectRecord<String, String> message) {
RecordId id = message.getId();
String topic = message.getStream();
String msgBody = message.getValue();
log.info("收到主题{}消息ID={}, 消息内容{}", topic, id.getValue(), msgBody);
String group = "some-service"; //消费组,使用服务名
redisTemplate.opsForStream().acknowledge(topic, group, id.getValue());
}
}
配置消息容器,将StreamListener
的实现注册到消息容器StreamMessageListenerContainer
:
@Configuration
public class RedisStreamListenerContainerConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Bean
public StreamMessageListenerContainer redisStreamListenerContainer() {
StreamMessageListenerContainerOptions options =
StreamMessageListenerContainerOptions.builder()
.batchSize(100)
.pollTimeout(Duration.ZERO)
.targetType(String.class)
.build();
StreamMessageListenerContainer container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
String GroupName = "some-service"; //消费组命名一般用服务名
String consumerName = "127.0.0.1:8080"; //消费者命名一般用服务集群下每个节点的ip:port,可以区分是哪个节点消费
String stream = "testTopic"; //stream名称,即topic
container.receive(Consumer.from(GroupName, consumerName),
StreamOffset.create(stream, ReadOffset.lastConsumed()),
new StreamMessageListener(stringRedisTemplate)); //将Listener添加到监听容器
container.start(); //启动消息容器
return container;
}
}
测试:
消息已发送,消息ID:1641872849111-0
收到主题testTopic消息ID=1641872849111-0, 消息内容{"name":"stream-肥兔子爱豆畜子"}
总结说明:
1、为了方便消息的格式笔者统一用了String类型,用fastjson做序列化以后进行传输。
2、StreamListener.onMessage收到消息进行处理以后,手工调用ack进行回复,不然服务端给当前消费者缓存的pending_ids会越来越大、占用内存。
3、消费组笔者一般用服务名来区分,服务下挂多个节点,那么每个节点可以用ip:port作为唯一标识,所以用ip:port作为消费组下的消费组名称。
参考:《Redis深度历险:核心原理与应用实践》
[Redis 的发布订阅功能在 SpringBoot 中的应用 - 知乎 (zhihu.com)](https://zhuanlan.zhihu.com/p/59065399)
[Stream消息队列在SpringBoot中的实践与踩坑 | (lolico.me)](https://lolico.me/2020/06/28/Using-stream-to-implement-message-queue-in-springboot/)
[redis — 基于Spring Boot实现Redis stream实时流事件处理_Haqiu.Hwang的博客-CSDN博客](https://blog.csdn.net/qq_38658567/article/details/109376888)