springboot集成Rabbit Mq
2018-12-26 本文已影响0人
丶君为红颜酔
部署服务器(docker)
### docker-hub
docker pull ...
### 运行
sudo docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 Rabbit的镜像号
## 开放端口
firewall-cmd --list-ports
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --reload
systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall开机启动
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
使用过程发现的一些情况
- 流程是:客户端推送消息到exchange,如果有router-key(topic模式)则会推送到符合条件的队列里,如果使用订阅模式(fanout exchange) 则会推送给所有订阅的队列。
- 回调的使用是指:消息成功推送到指定队列,就会接受到回调。
- 多个java类监听同个队列,会轮流处理。
- topic模式(topic exchange)和订阅模式(fanout exchange)的区别在于 topic会使用到router-key,支持模糊匹配。
使用方法
## 配置文件
spring:
rabbitmq:
password: guest
port: 5672
virtual-host: /
host: 127.0.0.1
publisher-confirms: true
username: guest
## 配置类
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String addresses;
@Value("${spring.rabbitmq.port}")
private String port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.publisher-confirms}")
private boolean publisherConfirms;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses + ":" + port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
/** 如果要进行消息回调,则这里必须要设置为true */
connectionFactory.setPublisherConfirms(publisherConfirms);
return connectionFactory;
}
@Bean
/** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplatenew() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
}
### RabitMq 特色:回调确认(回调是指推送到队列即返回)
@Slf4j
@Component
public class CallBackSender implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplatenew;
public void send() {
rabbitTemplatenew.setConfirmCallback(this);
String msg = "callbackSender : i am callback sender";
System.out.println(msg);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
System.out.println("callbackSender UUID: " + correlationData.getId());
this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("callbakck confirm:{}", correlationData);
}
}
### 创建队列
@Bean
public Queue helloQueue() {
return new Queue("helloQueue");
}
### 创建exchange
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
### 生产者
/**
* 多消费者轮流消费:要求消费者处理相同逻辑
* this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
* @return
*/
@NotAuth
@GetMapping("/normal")
public Object test() {
for (int i = 0; i < 5; i++) {
helloSender1.send("hellomsg:" + i);
helloSender2.send("hellomsg:" + i);
}
return ResultUtil.success();
}
/*======================================= 主题发布 ======================================*/
@Autowired
TopicSender topicSender;
/**
* 主题发布模式,和订阅模式最大区别是支持模糊路由
* 1. this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);
* 2. BindingBuilder.bind(new Queue("topic.messages")).to(exchange).with("topic.#");//模糊匹配成功,也能发送到这个队列
* 3. RabbitListener(queues = "topic.messages")
*
* @return
*/
@NotAuth
@GetMapping("/topic")
public Object topic() {
topicSender.send();
return ResultUtil.success();
}
/*======================================= 广播模式或者订阅模式 ======================================*/
@Autowired
FanoutSender fanoutSender;
/**
* 发布订阅模式(有订阅就能获取)
* 1. this.rabbitTemplate.convertAndSend("fanoutExchange",null, msgString);
* 2. BindingBuilder.bind(new Queue("fanout.A")).to(fanoutExchange);
* 3. RabbitListener(queues = "fanout.A")
*
* @return
*/
@NotAuth
@GetMapping("/fanout")
public Object fanout() {
fanoutSender.send();
return ResultUtil.success();
}
### 消费者
@Component
@RabbitListener(queues = "helloQueue")
@RabbitListener(queues = "topic.messages")
@RabbitListener(queues = "fanout.C")
public class Receiver {
@RabbitHandler
public void process(String msg) {
System.out.println("Receiver : " + msg);
}
}