redis

Redis做轻量级消息队列的3种玩法

2022-01-11  本文已影响0人  肥兔子爱豆畜子

Redis也可以做轻量级的消息队列:基于List的队列模式、PubSub多播的发布订阅模式、以及5.0之后提供的Stream。
特别是Stream,可以消息持久化、高可用、消息可以指定offset进行反复消费、支持发布订阅按消费组多播、使用消费者的pending_ids机制保证消息传递的可靠性等等,基本已经具有了真正的消息队列中间件的功能了。而PubSub模式在一些对消息传递可靠性和可追溯性不严格的场景(比如内网非一致性的消息通知)也有一定的使用价值。

一、快速入门

1、用list类型模拟队列

典型的队列模式,rpush右边放入,lpop左边取出。举例:

127.0.0.1:7001> rpush queue A
-> Redirected to slot [13011] located at 122.51.112.187:7003
(integer) 1
122.51.112.187:7003> rpush queue B
(integer) 2
122.51.112.187:7003> rpush queue C
(integer) 3
122.51.112.187:7003> lpop queue
"A"
122.51.112.187:7003> lpop queue
"B"
122.51.112.187:7003> lpop queue
"C"
122.51.112.187:7003> lpop queue
(nil)
2、PUB|SUB

PubSub解决了list做队列1个消息只能被单个消费者消费的问题,可以1个消息被多个消费组的消费组收到,即所谓的发布订阅模式。

用redis-cli先来用一下:

127.0.0.1:7001> subscribe testTopic
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "testTopic"
3) (integer) 1

返回subscript testTopic 1意思是订阅testTopic成功,然后阻塞等待消息,再开一个终端连接进行publish:

127.0.0.1:7001> publish testTopic msg1
(integer) 1

然后订阅者侧连接收到消息:

1) "message"
2) "testTopic"
3) "msg1"

message testTopic msg1,意思是收到了message,主题是testTopic,内容是msg1

可以同时订阅多个主题,

subscribe testTopic1 testTopic2 testTopic3

也可以按照模式匹配的方式来订阅主题psubscribe,比如订阅所有以test开头的主题

psubscribe test*

PubSub的消息没有持久化,发布消息之后如果没有消费者、那么消息会直接丢弃,如果消费者刚好此时宕机、重启后也不会收到宕机时发布的消息,所以PubSub来做消息队列的场景十分有限。

Redis5.0之后提供了更强大的Redis Stream数据结构,解决了上述问题。

3、Redis Stream

发布订阅模式,一个消息发布到stream里,多个消费组订阅了这个stream的话都可以收到这个消息,且消息是持久化的。消费组在stream里用last_delivered_id来定位偏移量、也就是消费到哪个消息了。从设计上看确实跟kafka非常像,Redis作者也说借鉴了kafka。

从数据结构上来说,stream是个链表,节点是消息,消息有ID,消息内容是一系列的k-v对。stream也有消费组和消费者的概念,每个消费者组用last_delivered_id游标指向链表中的节点、来表示消费到哪个ID的消息了,类似kafka里的offset偏移量,每个消费者组里可以有多个消费者,一个消息只会被投递给消费者组里的一个消费者,可以类比RocketMQ里的集群消息。另外,每个消费者内部还有一个 pending_ids数组,它记录着这个消费者已经被客户端读取了的、但客户端没有回复ACK的消息,由此确保消息至少被消费1次。

添加消息和直接读取消息命令:

xadd testTopic * name douchuzi  #向testTopic这个stream添加"name douchuzi"这个消息,*表示消息ID由Redis自动生成

消息ID可以由Redis自动生成,生成的ID类似"1641816045243-0",表示1641816045243这个时间戳这1ms生成的第0个消息。当然消息ID也可以由客户端自己生成。

xlen testTopic #查看有多少消息
xrange testTopic - + #从头到尾返回所有消息
xread count 1 streams testTopic 0 #从ID大于0处开始读取1个消息
xread count 1 block 0 streams testTopic $ #阻塞的从ID大于$处开始读取1个消息,$表示最后一个消息的ID

消费组命令:

xgroup create testTopic group1 $ #创建消费组group1,其last_delivered_id是$

xgroup的完整玩法:

xgroup [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
#让消费者consumer1-1从group1消费者组的testTopic stream中拿最新的、并且没有被发送给其他消费者处理的entry
xreadgroup group group1 consumer1-1 count 1 block 0 streams testTopic >

发送ACK,刚才阻塞xreadgroup之后从别的客户端xadd了1条消息,然后xreadgroup阻塞结束,消息收到。这时候我们看下consumer1-1的pending_ids:

127.0.0.1:7001> xpending testTopic group1 - + 1000  consumer1-1
1) 1) "1641819182032-0"
   2) "consumer1-1"
   3) (integer) 695470
   4) (integer) 1

xpending查询消费组group1中消费者consumer1-1的1000条未收到客户端回复的消息。

接着我们xack回复一下,再看看pending_ids:

127.0.0.1:7001> xack testTopic group1 1641819182032-0
(integer) 1
127.0.0.1:7001> xpending testTopic group1 - + 1000  consumer1-1
(empty array)

可见,xack之后,consumer1-1的pending_ids为空了,Redis Stream用这个办法来确保消息一定被投递。

二、实战开发:基于SpringBoot开发Redis消息队列

下面具体实战一下,用SpringBoot来做Redis的消息队列开发,笔者使用的SpringBoot版本是2.3.7.RELEASE,其默认的客户端是lettuce 5.3.5.RELEASE,测试所用的Redis为6节点Cluster,可以参照笔者的文章Redis分布式缓存搭建 - 简书 (jianshu.com)进行搭建。

1、PubSub

发布消息,比较简单,直接用RedisTemplate:

@Service
public class RedisDao {
    
    @Autowired
    RedisTemplate<String, Object> redisTemplate;
    
    /**
     * PubSub发布消息
     * */
    public void publishMessage(ChannelTopic channelTopic, String message) {
        redisTemplate.convertAndSend(channelTopic.getTopic(), message);
    }
}

订阅消息,使用Spring提供的消息容器RedisMessageListenerContainer以及消息到达监听接口MessageListener

//配置PubSub消息容器
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(redisConnectionFactory());
    
    /**这里可以使用自定义注解来发现所有的MessageHandler,
     * 然后循环container.addMessageListener来达到自动配置消息订阅者的目的
     * 这样开发只需要编写MessageHandler的实现类就可以了
     */
    MessageHandler handler = new MessageHandlerImpl();
    container.addMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message, byte[] pattern) {
            handler.handleMsg(message);
        }
        
    }, handler.getChannelTopic());
    
    return container;
}

MessageHandler接口及其实现是MessageHandlerImpl业务层代码:

/**
 * PubSub订阅消息
 * 
 * */
public interface MessageHandler {
    
    //订阅消息到达后的逻辑处理
    public void handleMsg(Message msg);
    
    //消息的Topic
    public ChannelTopic getChannelTopic();
}

@Slf4j
public class MessageHandlerImpl implements MessageHandler{

    @Override
    public void handleMsg(Message msg) {
        
        try {   
            String msgChannel = new String(msg.getChannel(), "utf-8");
            String msgBody = new String(msg.getBody(), "utf-8");
            log.info("收到消息:");
            log.info("Message channel : {}" , msgChannel);
            log.info("Message body : {}" , msgBody);
            
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public ChannelTopic getChannelTopic() {
        return new ChannelTopic("testPubSub");
    }

}

另外需要注意,Redis Cluster模式下如果客户端用的是Lettuce,需要配置客户端自适应刷新,在集群主备故障切换的时候、客户端能够自动切换到故障主节点对应的从节点去。详见笔者的文章Redis分布式缓存搭建 - 简书 (jianshu.com)

好了,测试一下:

@Slf4j
@SpringBootApplication
public class RedismqApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(RedismqApplication.class, args);
        RedisDao redisDao = context.getBean(RedisDao.class);
        redisDao.publishMessage(new ChannelTopic("testPubSub"), "肥兔子爱豆畜子");
    }

}

收到消息:
Message channel : testPubSub
Message body : 肥兔子爱豆畜子

2、Redis Stream

跟PubSub类似,也是需要消息容器MessageContainer、Listener这俩东西。

/**
 * 发送消息到指定stream
 * */
public void publishStreamMessage(String stream, Object message) {
    ObjectRecord<String, String> record =                 StreamRecords.newRecord().ofObject(JSON.toJSONString(message)).withStreamKey(stream);
    RecordId recordId = stringRedisTemplate.opsForStream().add(record);
    log.info("消息已发送,消息ID:{}" , recordId.getValue());
}

消息监听,实现StreamListener接口:

import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class StreamMessageListener implements StreamListener<String, ObjectRecord<String,String>>{
    
    private StringRedisTemplate redisTemplate;

    public StreamMessageListener(StringRedisTemplate stringRedisTemplate) {
        redisTemplate = stringRedisTemplate;
    }
    
    @Override
    public void onMessage(ObjectRecord<String, String> message) {

        RecordId id = message.getId();
        String topic = message.getStream();
        String msgBody = message.getValue();
        
        log.info("收到主题{}消息ID={}, 消息内容{}", topic, id.getValue(), msgBody);
        
        String group = "some-service"; //消费组,使用服务名
        redisTemplate.opsForStream().acknowledge(topic, group, id.getValue());
    }

}

配置消息容器,将StreamListener的实现注册到消息容器StreamMessageListenerContainer

@Configuration
public class RedisStreamListenerContainerConfig {
    
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Bean
    public StreamMessageListenerContainer redisStreamListenerContainer() {
        StreamMessageListenerContainerOptions options = 
                StreamMessageListenerContainerOptions.builder()
                                                    .batchSize(100)
                                                    .pollTimeout(Duration.ZERO)
                                                    .targetType(String.class)
                                                    .build();
        StreamMessageListenerContainer container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
        
        String GroupName = "some-service";  //消费组命名一般用服务名
        String consumerName = "127.0.0.1:8080"; //消费者命名一般用服务集群下每个节点的ip:port,可以区分是哪个节点消费
        String stream = "testTopic"; //stream名称,即topic
        
        container.receive(Consumer.from(GroupName, consumerName), 
                            StreamOffset.create(stream, ReadOffset.lastConsumed()),
                            new StreamMessageListener(stringRedisTemplate)); //将Listener添加到监听容器
        
        container.start(); //启动消息容器
        
        return container;
    }
}

测试:

消息已发送,消息ID:1641872849111-0

收到主题testTopic消息ID=1641872849111-0, 消息内容{"name":"stream-肥兔子爱豆畜子"}

总结说明:

1、为了方便消息的格式笔者统一用了String类型,用fastjson做序列化以后进行传输。

2、StreamListener.onMessage收到消息进行处理以后,手工调用ack进行回复,不然服务端给当前消费者缓存的pending_ids会越来越大、占用内存。

3、消费组笔者一般用服务名来区分,服务下挂多个节点,那么每个节点可以用ip:port作为唯一标识,所以用ip:port作为消费组下的消费组名称。

参考:《Redis深度历险:核心原理与应用实践》

       [Redis 的发布订阅功能在 SpringBoot 中的应用 - 知乎 (zhihu.com)](https://zhuanlan.zhihu.com/p/59065399)

       [Stream消息队列在SpringBoot中的实践与踩坑 | (lolico.me)](https://lolico.me/2020/06/28/Using-stream-to-implement-message-queue-in-springboot/)

      [redis — 基于Spring Boot实现Redis stream实时流事件处理_Haqiu.Hwang的博客-CSDN博客](https://blog.csdn.net/qq_38658567/article/details/109376888)
上一篇下一篇

猜你喜欢

热点阅读