RabbitMQ 与 SpringBoot2.X 整合
publisher-confirms ,实现一个监听器用于监听 broker 端给我们返回的确认请求: RabbitTemplate.ConfirmCallback
publisher-returns, 保证消息对 broker 端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续处理,保证消息的路由成功: RabbitTemplate.ReturnCallback
注意一点,在发送消息的时候对 template 进行配置 mandatory = true 保证监听有效。在生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等
代码实现:
消费端代码地址:https://github.com/hmilyos/rabbitmq-springboot-consumer.git
生产端代码地址:https://github.com/hmilyos/rabbitmq-springboot-product.git
消费端:
依赖:
![](https://img.haomeiwen.com/i9167995/a60cf42bb40fd9e0.png)
核心配置在配置文件里面
首先配置手工确认签收模式,用于ACK 的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列(不建议重回队列)、根据业务记录日志等处理。
设置消费端的监听个数和最大个数,用于控制消费端的并发情况
![](https://img.haomeiwen.com/i9167995/6b9a368b69a8b66f.png)
![](https://img.haomeiwen.com/i9167995/cc8def8b8a214c49.png)
消费端的监听 RabbitListener 这个注解很好用!!!RabbitListener 是一个组合注解,里面可以注解配置 。@QueueBinding @Queue @Exchange 直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等。
Message 使用的是 org.springframework.messaging.Message
![](https://img.haomeiwen.com/i9167995/b86ca79a68d4314e.png)
生产端的核心配置
spring.rabbitmq.template.mandatory=true 的意思是: return 的时候代表消息不可达,设置 broker 不自动删除该消息,而是返回到生产端,让我们进行一些后续的处理
![](https://img.haomeiwen.com/i9167995/aacde317875867df.png)
先写一个主配置类
![](https://img.haomeiwen.com/i9167995/c6e5995acbc18817.png)
注意 returnCallback 这里的 Message 使用的是 org.springframework.amqp.core.Message message
![](https://img.haomeiwen.com/i9167995/943b01869f3f796f.png)
注意这里的 Message 使用的是 org.springframework.messaging.Message
![](https://img.haomeiwen.com/i9167995/977b532d379807ea.png)
![](https://img.haomeiwen.com/i9167995/a8ff660983f07edc.png)
运行单元测试
![](https://img.haomeiwen.com/i9167995/6c01ccf0da28c702.png)
![](https://img.haomeiwen.com/i9167995/2ac62b6225bea733.png)
修改一下发送时的 routingkey ,模拟发送失败
![](https://img.haomeiwen.com/i9167995/9748a5e80c75f521.png)
就进入 returnCallback
![](https://img.haomeiwen.com/i9167995/fa2652b6f2e8c564.png)
改进一下代码,这回发送一个 Java 实体
先声明一些队列、交换机、routingKey 的配置
![](https://img.haomeiwen.com/i9167995/08fe51120b4ad7b7.png)
这里面有个特别需要注意的地方,Payload 里面的路径要跟 生产端的实体路径完全一致,要不然会找到不到这个类的!!!
![](https://img.haomeiwen.com/i9167995/8927154f139bded1.png)
注意实体要实现 Serializable 序列化接口,要不然发送消息会失败的!!
![](https://img.haomeiwen.com/i9167995/9715b4fa64a00323.png)
照样跟着写一个发消息的方法
![](https://img.haomeiwen.com/i9167995/d829ec349f06fc4c.png)
编写单元测试
![](https://img.haomeiwen.com/i9167995/2421fd52b43bc324.png)
运行单元测试
![](https://img.haomeiwen.com/i9167995/0d5568e72d31aeed.png)
验证 Java 实体消息发送成功
![](https://img.haomeiwen.com/i9167995/bb5b2e56cbd9f697.png)