SpringCloud之Stream-7.延迟消息

2021-10-29  本文已影响0人  那钱有着落吗

首先先讲一个例子:

延迟消息概念

延迟消息很好理解,就是一种不会立即被消费,而是延迟到未来某个时间点才能被消费的消息类型

阿里新零售业务库存发布

在电商场景里商品库存是一个核心数据,上下游很多链路都围绕库存做足了文章。在我们的业务有两个和库存紧密联系的业务名词

l 今日/明日库存 今日库存就是当日可供售卖的库存量,同理,明日库存就是下一日的可用库存量

l 库存计划 运营和采购要对商品的上架数量做出评估,然后整理出未来几天的库存计划表,这个计划表决定了未来每日新增库存的数量

从上面两个名词,我们不难看出来“今日”与“明日”的界限是个关键事件点,也就是说,库存计划需要在每日的0点准时生效,为第二日的商品发布新的库存数量。这个场景天然适合使用延迟消息来实现,我们当时的流程设计是这样的。

image.png

l 库存中心 由运营添加库存计划到库存中心,后台服务会将库存发布计划作为延迟消息发送到消息组件。至于要延迟多长时间生效,则是用库存生效时间减去当前时间计算出来的。比如明日库存计划的生效时间是0点减去当前时间

l 消息中间件 消息中间件在接收到库存发布消息后,根据其中设置的延迟生效时间向消息队列中添加新消息

l 商品服务 所有商品服务作为一个消费者分组,订阅了消息组件中的库存发布消息,一旦有新消息到来,这条消息就会被该分组内的某个服务消费

开启延迟消息

打开RabbitMQ官方的插件下载页面
https://www.rabbitmq.com/community-plugins.html
网址,从中找到
rabbitmq_delayed_message_exchange
这个插件。

image.png

上面就是延迟插件的下载列表,不同版本的RabbitMQ对应不同版本的插件,同学们先看一下你的安装版本,然后选择对应的插件下载。比如我的RabbitMQ版本是3.7.15,所以选择3.7.x版本的插件。

安装插件

下载完成后将插件解压,把解压后的文件copy到RabbitMQ安装目录下的plugins文件夹。然后运行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
这个命令安装插件。安装好后你会看到日志中打印出了这个插件的名称

rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_shovel
rabbitmq_shovel_management
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@LM-SHC-1650...

重启服务

本地直接执行
rabbitmqctl stop
命令关闭RabbitMQ(前提是安装路径已经添加到系统变量中,否则要先进入安装目录后再执行该命令)。待完全关闭之后,再执行
rabbitmq-server
命令启动服务。
对Mac系统来说,添加RabbitMQ到系统变量只要修改
~/.bash_profile
添加这行就可以了:
export PATH={PATH}:RABBIT_HOME/sbin
。对Windows同理,需要你向系统变量的PATH属性中添加RabbitMQ的路径信息。
到这一步,准备工作就大功告成了。

实战

image.png

创建一个新的topic类


public interface DelayedTopic {

    String INPUT = "delayed-consumer";

    String OUTPUT = "delayed-producer";

    @Input(INPUT)
    SubscribableChannel input();

    @Output(OUTPUT)
    MessageChannel output();
}
@PostMapping("sendDelayedMsg")
    public void sendDelayedMsg(@RequestParam(value="body")String body,
                               @RequestParam(value="seconds")Integer seconds){
        MessageBean msg = new MessageBean();
        msg.setPayload(body);

        log.info("ready to send delayed message");

        delayedTopicProducer.output().send(
                MessageBuilder.withPayload(msg)
                        .setHeader("x-delay",1000*seconds).build());
    }

监听器也要添加一下


@Slf4j
@EnableBinding(
        value = {
                GroupTopic.class,DelayedTopic.class
        }
)
public class StreamConsumer {

    @StreamListener(GroupTopic.INPUT)
    public void consumeGroupMessage(Object payload){
        log.info("Group message consumed successfully,payload={}",payload);
    }

    @StreamListener(DelayedTopic.INPUT)
    public void consumeDelayedMessage(MessageBean messageBean){
        log.info("Delayed message consumed successfully,payload={}",messageBean.getPayload());
    }


}

添加一下延迟消息的配置,最后的是打开延迟的exchange

#延迟消息配置
spring.cloud.stream.bindings.delayed-consumer.destination=delayed-topic
spring.cloud.stream.bindings.delayed-producer.destination=delayed-topic
spring.cloud.stream.rabbit.bindings.delayed-producer.producer.delayed-exchange=true

启动之后可以去rabbitmq的界面看一下:


image.png

图中的DM意思就是dalayed_message,如果有这个标签就代表是延迟消息

发送接口做测试:


image.png image.png
上一篇下一篇

猜你喜欢

热点阅读