RabbitMq(四):RabbitMq 消息路由
2019-06-04 本文已影响0人
aix91
消息路由的几种方式
- Direct:直连模式
- Topic: 转发模式
- Fanout :广播模式
1. Topic模式
topic 模式下可以使用统配符表示bingKey:'*'表示匹配一个单词, '#'则表示匹配没有或者多个单词。由此可以实现一个queue接收多个路由的消息。
- 创建queue:注意queue的名字是该queue对用的routing key
@Bean(name = "topic_queue1")
public Queue topic_queue_1() {
return new Queue("topic.queue.1");
}
@Bean(name = "topic_queue2")
public Queue topic_queue_2() {
return new Queue("topic.queue.2");
}
- 创建TopicExchange
@Bean
public TopicExchange exchange() {
return new TopicExchange("topic_exchange");
}
- 创建binding
@Bean
public Binding bindingExchangeMessage1(@Qualifier("topic_queue1") Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("topic.queue.1");
}
@Bean
public Binding bindingExchangeMessage2(@Qualifier("topic_queue2") Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("topic.queue.*");
}
在为queue设置路由时,with里面可以指定其他的routing key,那么该queue就不仅可以接收在创建之初设置的routing key上传来的数据,还可以接收新绑定key上传来的消息。
- 发送消息
发送消息时要指定exchange和routingKey
template.convertAndSend("topic_exchange", "topic.queue.test", message + "_topic_exchange_test");
- 接收消息:监听queue
@RabbitListener(queues = "topic.queue.1")
public void process2(String message) {
logger.info("topic.queue.1_" + message);
}
@RabbitListener(queues = "topic.queue.2")
public void process3(String message) {
logger.info("topic.queue.2_" + message);
}
2. fanout 模式
广播模式下,不用理会routing key。FanoutExchange 会将消息传递到exchange绑定好的queue list上去。
- 创建queue
@Bean(name = "fanout_queue")
public Queue fanout_queue() {
return new Queue("fanout.queue.1");
}
@Bean(name = "fanout_queue2")
public Queue fanout_queue_2() {
return new Queue("fanout.queue.2");
}
@Bean(name = "fanout_queue3")
public Queue fanout_queue_3() {
return new Queue("fanout.queue.3");
}
- 创建FanoutExchange
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_exchange");
}
- 创建binding: 将需要被广播的queue绑定到fanout exchange上去
@Bean
public Binding fanoutBindingExchangeMessage1(@Qualifier("fanout_queue") Queue queue, FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding fanoutBindingExchangeMessage2(@Qualifier("fanout_queue2") Queue queue, FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding fanoutBindingExchangeMessage3(@Qualifier("fanout_queue3") Queue queue, FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
- 发送消息
template.convertAndSend("fanout_exchange", "", message + "_fanout_exchange");