springboot-rabbitmq之订阅/发布(三)
2020-11-12 本文已影响0人
前进的码农
概念
data:image/s3,"s3://crabby-images/03300/03300b3c660c0ccf481959023c47210ba1b11b89" alt=""
fanout模式下routing_key是被忽略的
实现
根据图,我们新建一个生产者,声明一个路由,2个队列,队列分别绑定该路由。生产者向这个路由发送消息。在新建一个消费者设置2个队列消息监听器,分别监听这2个消息队列。
生产者
生产者路由队列配置代码
@Component
@Data
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
@Autowired
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
//声明队列
@Bean("queue")
Queue queue(){
return QueueBuilder.durable("queue_fount").build();
}
//声明队列1
@Bean("queue01")
Queue queue01(){
return QueueBuilder.durable("queue_fount_01").build();
}
//声明路由注意路由的类型为fanout
@Bean
Exchange exchange(){
return ExchangeBuilder.fanoutExchange("ethan.exchange_fanout").build();
}
//绑定队列到路由
@Bean
Binding binding(@Qualifier("queue") Queue queue,
Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
//绑定队列1到路由
@Bean
Binding binding01(@Qualifier("queue01") Queue queue,
Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
}
生产者消息发送代码
@SpringBootTest
class RabbitPubsubProducerApplicationTests {
@Test
void contextLoads() {
}
@Autowired
MyBean myBean;
@Test
void test(){
//发送10条消息到路由
for (int i = 0; i <10 ; i++) {
myBean.getAmqpTemplate().convertAndSend("ethan.exchange_fanout","","queues:"+i);
}
}
}
消费者
2个队列监听器
@Component
@Slf4j
public class ReceiverBean {
@RabbitListener(queues = "queue_fount")
public void processMessage(String msg) {
log.info("queue_fount---"+msg);
}
@RabbitListener(queues = "queue_fount_01")
public void processMessage01(String msg) {
log.info("queue_fount_01---"+msg);
}
}
控制台效果
data:image/s3,"s3://crabby-images/a709e/a709eed4e18873e800e2ea40abcf86ec5155650c" alt=""
可以看出2个队列分别全部收到了10条消息
代码
消息生产者
https://gitee.com/ethanlab/rabbitmq/tree/master/rabbit-pubsub-producer
消息消费者
https://gitee.com/ethanlab/rabbitmq/tree/master/rabbit-pubsub-consumer