SpringCloud之Stream-7.延迟消息
首先先讲一个例子:
延迟消息概念
延迟消息很好理解,就是一种不会立即被消费,而是延迟到未来某个时间点才能被消费的消息类型
阿里新零售业务库存发布
在电商场景里商品库存是一个核心数据,上下游很多链路都围绕库存做足了文章。在我们的业务有两个和库存紧密联系的业务名词
l 今日/明日库存 今日库存就是当日可供售卖的库存量,同理,明日库存就是下一日的可用库存量
l 库存计划 运营和采购要对商品的上架数量做出评估,然后整理出未来几天的库存计划表,这个计划表决定了未来每日新增库存的数量
从上面两个名词,我们不难看出来“今日”与“明日”的界限是个关键事件点,也就是说,库存计划需要在每日的0点准时生效,为第二日的商品发布新的库存数量。这个场景天然适合使用延迟消息来实现,我们当时的流程设计是这样的。
image.pngl 库存中心 由运营添加库存计划到库存中心,后台服务会将库存发布计划作为延迟消息发送到消息组件。至于要延迟多长时间生效,则是用库存生效时间减去当前时间计算出来的。比如明日库存计划的生效时间是0点减去当前时间
l 消息中间件 消息中间件在接收到库存发布消息后,根据其中设置的延迟生效时间向消息队列中添加新消息
l 商品服务 所有商品服务作为一个消费者分组,订阅了消息组件中的库存发布消息,一旦有新消息到来,这条消息就会被该分组内的某个服务消费
开启延迟消息
打开RabbitMQ官方的插件下载页面
https://www.rabbitmq.com/community-plugins.html
网址,从中找到
rabbitmq_delayed_message_exchange
这个插件。
上面就是延迟插件的下载列表,不同版本的RabbitMQ对应不同版本的插件,同学们先看一下你的安装版本,然后选择对应的插件下载。比如我的RabbitMQ版本是3.7.15,所以选择3.7.x版本的插件。
安装插件
下载完成后将插件解压,把解压后的文件copy到RabbitMQ安装目录下的plugins文件夹。然后运行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
这个命令安装插件。安装好后你会看到日志中打印出了这个插件的名称
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_shovel
rabbitmq_shovel_management
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@LM-SHC-1650...
重启服务
本地直接执行
rabbitmqctl stop
命令关闭RabbitMQ(前提是安装路径已经添加到系统变量中,否则要先进入安装目录后再执行该命令)。待完全关闭之后,再执行
rabbitmq-server
命令启动服务。
对Mac系统来说,添加RabbitMQ到系统变量只要修改
~/.bash_profile
添加这行就可以了:
export PATH=RABBIT_HOME/sbin
。对Windows同理,需要你向系统变量的PATH属性中添加RabbitMQ的路径信息。
到这一步,准备工作就大功告成了。
实战
image.png创建一个新的topic类
public interface DelayedTopic {
String INPUT = "delayed-consumer";
String OUTPUT = "delayed-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
@PostMapping("sendDelayedMsg")
public void sendDelayedMsg(@RequestParam(value="body")String body,
@RequestParam(value="seconds")Integer seconds){
MessageBean msg = new MessageBean();
msg.setPayload(body);
log.info("ready to send delayed message");
delayedTopicProducer.output().send(
MessageBuilder.withPayload(msg)
.setHeader("x-delay",1000*seconds).build());
}
监听器也要添加一下
@Slf4j
@EnableBinding(
value = {
GroupTopic.class,DelayedTopic.class
}
)
public class StreamConsumer {
@StreamListener(GroupTopic.INPUT)
public void consumeGroupMessage(Object payload){
log.info("Group message consumed successfully,payload={}",payload);
}
@StreamListener(DelayedTopic.INPUT)
public void consumeDelayedMessage(MessageBean messageBean){
log.info("Delayed message consumed successfully,payload={}",messageBean.getPayload());
}
}
添加一下延迟消息的配置,最后的是打开延迟的exchange
#延迟消息配置
spring.cloud.stream.bindings.delayed-consumer.destination=delayed-topic
spring.cloud.stream.bindings.delayed-producer.destination=delayed-topic
spring.cloud.stream.rabbit.bindings.delayed-producer.producer.delayed-exchange=true
启动之后可以去rabbitmq的界面看一下:
image.png
图中的DM意思就是dalayed_message,如果有这个标签就代表是延迟消息
发送接口做测试:
image.png image.png