SpringBoot整合rabbitmq
2022-01-06 本文已影响0人
任未然
一. 概述
参考开源项目https://github.com/xkcoding/spring-boot-demo
本Demo简单集成rabbitmq的使用
二. 安装rabbitmq
用docker安装, 没安装
docker
的先安装docker
-
下载镜像:
docker pull rabbitmq:3.7.7-management
-
运行容器:
docker run -d -p 5671:5617 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 --name rabbit-3.7.7 rabbitmq:3.7.7-management
-
浏览器打开网址
http://localhost:15672/
image.png默认账号密码: guest/guest
- 新建交换机,命名为: topic.wpr
- 新建队列
队列名为:
queue.2
, 队列绑定交换机:topic.wpr
,路由Key:queue.wpr
image.pngRoutingKey规则
- 路由格式必须以
.
分隔,比如user.email
或者user.aaa.email
- 通配符
*
,代表一个占位符,或者说一个单词,比如路由为user.*
,那么user.email
可以匹配,但是user.aaa.email
就匹配不了- 通配符
#
,代表一个或多个占位符,或者说一个或多个单词,比如路由为user.#
,那么user.email
可以匹配,user.aaa.email
也可以匹配
三. SpringBoot项目
3.1 application.yml
server:
port: 8080
servlet:
context-path: /demo
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
retry:
#enabled:开启失败重试
enabled: true
#第一次重试的间隔时长
initial-interval: 1000ms
#最大重试次数
max-attempts: 3
#最长重试间隔,超过这个间隔将不再重试
max-interval: 10000ms
#下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
multiplier: 1
#重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列
default-requeue-rejected: true
# 手动提交消息
acknowledge-mode: manual
direct:
# 手动提交消息
acknowledge-mode: manual
3.2 监听队列事件
import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.Channel;
import com.xkcoding.mq.rabbitmq.constants.RabbitConsts;
import com.xkcoding.mq.rabbitmq.message.MessageStruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@RabbitListener(queues = "queue.2")
@Component
public class QueueTwoHandler {
@RabbitHandler
public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("队列2,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct));
// 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
3.3 消息发送
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootDemoMqRabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendTopic1() {
rabbitTemplate.convertAndSend("topic.wpr", "queue.wpr", new MessageStruct("topic message"));
}
}