消息队列实践(三)—— Exchange和Route
RabbitMQ官网有一篇文章用来介绍AMQP模型。AMQP是什么呢,它是RabbitMQ所支持的队列协议,即为:Advanced Message Queue Protocol,基于这个协议producer和consumer才可以和broker进行交互。
在我们之前的文章里介绍了producer - queue - consumer这样的结构,而在实际的AMQP中还有一个实例——Exchange,我们不妨看一下官网给出来的架构图:
AMQP 模型
这个是RabbitMQ完全的架构模型,我们看到Publisher将消息发布到Exchange,而非Queue,完全由Exchange决定将message发布到哪一个Queue,分发的依据来自route rules。这样做有什么好处呢?
- Publisher单纯的进行发布工作,它不用担心具体推送到哪一个队列;
- 在实际项目中,根据接受的数据类型不同可能存在多个队列,比如一个媒体中心的新闻数据有体育新闻、时事新闻、民生新闻等等,这些新闻类型是并行而不交叉的,所以每个类型都存在自己的Queue。Publisher发布的新闻到Exchange中会根据route rule找到对应的队列,而不会污染其它的队列数据。
从上面的例子我们可以看出Exchange在接受Publisher的message后,会有不同类型的分发,这涉及到Exchange的不同类型:Direct Exchange、Fanout Exchange、Topic Exchange、Headers Exchange、Default Exchange五类
1. Direct Exchange
从名字可以看出,它是一种简单粗暴的分派方式,publisher在发布message的时候,会给message一个routing key,比如说字符串“key”,当message到Exchange后,会被转发给routing key也为“key”的队列(可能有多个)。
2. Fanout Exchange
这种方式就是消息广播,所有与该类型Exchange绑定的Queue都会收到message。有多少个queue,Exchange就会复制多少分,然后转发。比如我们在腾讯体育看NBA直播,上面的比分数据时时变化,那么腾讯体育就可以将比分message发布给Fanout Exchange,这样让所有终端都可以收到。
3. Topic Exchange
它实际上是对Direct Exchange的强化,在Direct X 中routing key是绝对相等式匹配,那么Topic X提供模糊匹配的方式。
4. Header Exchange
以上Exchange方式都是通过Routing Key进行匹配转发,Header Exchange放弃使用Routing Key,而是采用Header Properties的方式。在创建Queue时会定义一个map1,这个map1中除了普通的key-value,还会有一个熟悉key:x-match,value:any/all;Producer在创建message的时候也会产生一个map2,Header X会将message中的map2和Queue中的map1进行比对,如果map1定义的x-match:any则代表只要map1与map2中的key-value有一个匹配,该message就会转发到该Queue中;如果map1定义的x-match:all则代表map1与map2中的key-value必须全部匹配,该message才会转发到该Queue中。
5. Default Exchange
Default X其实就是Direct X的简易版,如果Queue在定义Routing Key的时候设为空字符串,则默认这个Queue的Routing Key就是它的名字。其它过程和Direct X一致。
下面我们用Topic Exchange和Header Exchange作为实战样例,来看看Exchange的使用。
-
Topic Exchange
//Rev.java
package com.otof.rabbitmq.receive;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv {
private final String QUEUE_TOPIC_1 = "QUEUE_TOPIC1";
private final String QUEUE_TOPIC_2 = "QUEUE_TOPIC2";
private final String QUEUE_TOPIC_3 = "QUEUE_TOPIC3";
private final String QUEUE_HEADER = "QUEUE_HEADER";
private final String EXCHANGE_TOPIC = "TOPICX";
public void receiveMessage() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//创建一个Topic Exchange
channel.exchangeDeclare(EXCHANGE_TOPIC, "topic");
//声明3个队列
channel.queueDeclare(QUEUE_TOPIC_1, false, false, false, null);
channel.queueDeclare(QUEUE_TOPIC_2, false, false, false, null);
channel.queueDeclare(QUEUE_TOPIC_3, false, false, false, null);
//将三个队列都与Topic X关联,同时定义各自的Routing Key
//支持类似于topic.one *代表一个单词
channel.queueBind(QUEUE_TOPIC_1, EXCHANGE_TOPIC, "topic.*");
//支持类似于topic.one或topic.one.two #代表零个或多个以.分隔的单词
channel.queueBind(QUEUE_TOPIC_2, EXCHANGE_TOPIC, "topic.#");
//仅支持Routing Key=topic_not_pass的message
channel.queueBind(QUEUE_TOPIC_3, EXCHANGE_TOPIC, "topic_not_pass");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//定义成功入队后的回调函数
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("This is Queue 1 Received '" + message + "'");
};
DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("This is Queue 2 Received '" + message + "'");
};
DeliverCallback deliverCallback3 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("This is Queue 3 Received '" + message + "'");
};
channel.basicConsume(QUEUE_TOPIC_1, true, deliverCallback1, consumerTag -> {});
channel.basicConsume(QUEUE_TOPIC_2, true, deliverCallback2, consumerTag -> {});
channel.basicConsume(QUEUE_TOPIC_3, true, deliverCallback3, consumerTag -> {});
}
}
//Send.java
package com.otof.rabbitmq.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private final String EXCHANGE_TOPIC = "TOPICX";
private final String QUEUE_TOPIC = "QUEUE_TOPIC";
public void sendMessage(String message, String key) {
ConnectionFactory factory = new ConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//声明Exchange Topic,名字与Rev.java中一致
channel.exchangeDeclare(EXCHANGE_TOPIC, "topic");
//发布带Routing Key的message信息
channel.basicPublish(EXCHANGE_TOPIC, key, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//RabbitmqController.java
package com.otof.rabbitmq.controllers;
import com.otof.rabbitmq.receive.Recv;
import com.otof.rabbitmq.send.Send;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@RestController
public class RabbitmqController {
@PostMapping(path = "addToQueue")
public void addMessageToQueue(@RequestParam String message, @RequestParam String key) {
new Send().sendMessage(message, key);
}
@GetMapping(path = "getFromQueue")
public void getMessageFromQueue() throws IOException, TimeoutException {
new Recv().receiveMessage();
}
}
用postman请求getFromQueue,进行注册Exchange和Queue,这个过程叫做Subscription;然后请求addToQueue,将message插入队列。我们来看一下结果:
-
请求:当Key=topic
Key=topic
结果:匹配topic.#,#代表零个或多个以.分隔的字符串
key=topic结果 -
请求:Key=topic.one
key=topic.one结果 -
请求:Key=topic.one.two.three
Key=topic.one.two.three -
请求:Key= topic_not_pass
Key= topic_not_pass结果
以上就是Topic X的实践,把握以下几条关键点:
- 先注册后发布,Subscription-Public
- Public只针对Exchange,与Queue无关
- Exchange与Queue的绑定过程中需要定义Routing Key
-
Header Exchange
//Rev.java
package com.otof.rabbitmq.receive;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Recv {
private final String QUEUE_HEADER_1 = "QUEUE_HEADER1";
private final String QUEUE_HEADER_2 = "QUEUE_HEADER2";
private final String EXCHANGE_HEADER = "HEADERX";
public void receiveMessage() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明Header X
channel.exchangeDeclare(EXCHANGE_HEADER, "headers");
//任意满足key1=>aaa或key2=>bbb都通过
Map<String, Object> map1 = new HashMap(){{
put("key1", "aaa");
put("key2", "bbb");
put("x-match", "any");
}};
channel.queueDeclare(QUEUE_HEADER_1, false, false,false, null);
channel.queueBind(QUEUE_HEADER_1, EXCHANGE_HEADER, "", map1);
//全部满足key1=>aaa或key2=>bbb则通过
Map<String, Object> map2 = new HashMap(){{
put("key1", "aaa");
put("key2", "ccc");
put("x-match", "all");
}};
channel.queueDeclare(QUEUE_HEADER_2, false, false,false, null);
channel.queueBind(QUEUE_HEADER_2, EXCHANGE_HEADER, "", map2);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("This is Queue 1 Received '" + message + "'");
};
DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("This is Queue 2 Received '" + message + "'");
};
channel.basicConsume(QUEUE_HEADER_1, true, deliverCallback1, consumerTag -> {});
channel.basicConsume(QUEUE_HEADER_2, true, deliverCallback2, consumerTag -> {});
}
}
//Send.java
package com.otof.rabbitmq.send;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Send {
private final String EXCHANGE_TOPIC = "TOPICX";
private final String EXCHANGE_HEADER = "HEADERX";
public void sendMessage(Map<String, Object> map, String message) {
ConnectionFactory factory = new ConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_HEADER, "headers");
BasicProperties basicProperties = new BasicProperties();
basicProperties = basicProperties.builder().headers(map).build();
channel.basicPublish(EXCHANGE_HEADER, "", basicProperties, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//RequestData.java
package com.otof.rabbitmq.entity;
import java.util.Map;
public class RequestData {
String message;
Map<String, Object> map;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Map<String, Object> getMap() {
return map;
}
public void setMap(Map<String, Object> map) {
this.map = map;
}
}
//RabbitmqController.java
package com.otof.rabbitmq.controllers;
import com.otof.rabbitmq.entity.RequestData;
import com.otof.rabbitmq.receive.Recv;
import com.otof.rabbitmq.send.Send;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@RestController
public class RabbitmqController {
@PostMapping(path = "addToQueue")
public void addMessageToQueue(@RequestBody RequestData requestData) {
new Send().sendMessage(requestData.getMap(), requestData.getMessage());
}
@GetMapping(path = "getFromQueue")
public void getMessageFromQueue() throws IOException, TimeoutException {
new Recv().receiveMessage();
}
}
我们看一下postman发送message和Key Routing
Key Routing
结果:
结果