Spring Boot消息队列敲门砖:Redis中的发布订阅模式
2020-12-06 本文已影响0人
狄仁杰666
前言
来啦老铁!
笔者学习Spring Boot有一段时间了,截至目前已实践、总结了25篇Spring Boot系列学习文章,感兴趣的同学可以关注专题一起学习吧!
Spring Boot全家桶在前2篇文章中,我们一起实践实现了一个基本的个人云盘应用,我个人会在后续继续优化功能,以达到能交付使用的程度,有关注的同学,可以注意一下git仓库内的代码变化!
而接下来我们来学点什么呢?
接下来我打算开始学习Message Queue,即消息队列。
笔者之前接触过SQS(Simpler Queue Service),但仅有一点点经验,不足挂齿,而反观消息队列的重要性、在实际应用场景的广泛性等,觉得是时候推开未知的大门,看个究竟了!
笔者计划基于Spring Boot从几个方面入手学习消息队列:
- Redis中的发布订阅模式;
- Spring Boot中集成RocketMQ;
- Spring Boot中集成RabbitMQ;
- Spring Boot中集成Kafka;
- 尝试对几款消息队列系统进行对比、评价!
生命有限,今天我们仅先学习Redis中的发布订阅模式!
代码基于之前学习Redis时使用的Git Hub仓库演进,欢迎取阅:
整体步骤
- 编写消息发布接口;
- 编写消息订阅监听实现类;
- 配置消息订阅监听;
- Redis发布订阅演示;
项目代码改动如下:
项目代码改动1. 编写消息发布接口;
通过调用接口的方式,将消息发布到channel中:
package com.github.dylanz666.listener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* @author : dylanz
* @since : 12/06/2020
*/
@Component
public class MessageReceiver implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
byte[] c = message.getChannel();
byte[] b = message.getBody();
try {
String channel = new String(c, StandardCharsets.UTF_8);
String body = new String(b, StandardCharsets.UTF_8);
//在此处做接收到消息后的业务处理
System.out.println("channel: " + channel);
System.out.println("body: " + body);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2. 编写消息订阅监听实现类;
可以新建一个listener包,在包内新建一个消息订阅监听实现类MessageReceiver.java(名字不限),织入如下代码:
package com.github.dylanz666.listener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* @author : dylanz
* @since : 12/06/2020
*/
@Component
public class MessageReceiver implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
byte[] c = message.getChannel();
byte[] b = message.getBody();
try {
String channel = new String(c, StandardCharsets.UTF_8);
String body = new String(b, StandardCharsets.UTF_8);
//在此处做接收到消息后的业务处理
System.out.println("channel: " + channel);
System.out.println("body: " + body);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 配置消息订阅监听;
监听实现类还不够,我们还要告诉Spring Boot,你要用我的这个监听啦,不然Spring Boot不知道呀!在RedisConfig类中添加配置代码:
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
//此处添加消息监听(即订阅),可根据channel的业务划分情况,在此处统一加入多个消息监听
container.addMessageListener(messageListenerAdapter, new PatternTopic("channel:demo"));
return container;
}
@Bean
public MessageListenerAdapter messageListenerAdapter(MessageReceiver myRedisMessageListener) {
return new MessageListenerAdapter(myRedisMessageListener, "onMessage");
}
当然,我这里把发布者和订阅者都放在同一个服务内,这在实际场景中基本是不可能出现的,正常二者是处于不同的服务,通过消息队列系统实现解耦。
本例只作演示、学习用,请忽略这个情况哈!
4. Redis发布订阅演示;
启动项目后,浏览器直接调用发布API,如:
-
http://127.0.0.1:8080/api/message?message=this%20is%20a%20test%20message
(空格被浏览器自动转义了,捂脸)
发布
可见,当我们通过Redis发布消息后,订阅者自动地获得了消息。
事实上,当一个channel被多个订阅者订阅后,一有消息发布到该channel,则所有的订阅者都能接收到该消息,如图:
发布-订阅(来自菜鸟教程)我们可以把这个东西理解为群聊,当群里有一个人发了一条消息(发布者),其他所有在群里的人(订阅者)都能接收到消息!
整个打通的过程十分简单,当然也正是因为其简单,也隐藏了一些问题,例如:
-
如何保证数据传输的可靠性,因为很有可能订阅者过程由于网络等原因导致订阅的数据丢失;
-
如何保证Redis本身在该发布订阅模式下的稳定性,因为Redis本身运行于内存,受限于内存,当发布的消息不能被及时消费,消息将不断积累,严重的话将最终导致Redis本身挂掉!
尽管如此,我们仍可以将之作为学习消息队列的敲门砖,您说呢?
如果本文对您有帮助,麻烦点赞、关注!
谢谢!