RabbitMQ六种队列模式-主题模式
从前面的几篇我们依次经历了 exchange
模式从 fanout
> direct
的转变过程,在 fanout
时,我们只能进行简单的广播,对应类型比较单一,使用 direct
后,消费者则可以进行一定程度的选择,但是,direct
还是有局限性,路由不支持多个条件。
direct
不支持匹配 routingKey
,一但绑定了就是绑定了,而 topic
主题模式支持规则匹配,只要符合 routingKey
就能发送到绑定的队列上。
1. 什么是主题模式
topics
主题模式跟 routing
路由模式类似,只不过路由模式是指定固定的路由键 routingKey
,而主题模式是可以模糊匹配路由键 routingKey
,类似于SQL中 =
和 like
的关系。
P 表示为生产者、 X 表示交换机、C1C2 表示为消费者,红色表示队列。
topics
模式与 routing
模式比较相近,topics
模式不能具有任意的 routingKey
,必须由
一个英文句点号“.”
分隔的字符串(我们将被句点号“.”
分隔开的每一段独立的字符串称为一个单词),比如 "lazy.orange.fox"
。topics
routingKey
中可以存在两种特殊字符“”
与“#”
,用于做模糊匹配,其中“”
用于匹配一个单词,“#”
用于匹配多个单词(可以是零个)。
"*" 表示任何一个词
"#" 表示0或1个词
以上图中的配置为例:
如果一个消息的 routingKey
设置为 “xxx.orange.rabbit”
,那么该消息会同时路由到 Q1
与 Q2
,routingKey="lazy.orange.fox”
的消息会路由到Q1
与Q2
;
routingKey="lazy.brown.fox”
的消息会路由到 Q2
;
routingKey="lazy.pink.rabbit”
的消息会路由到 Q2
(只会投递给Q2
一次,虽然这个routingKey
与 Q2
的两个 bindingKey
都匹配);
routingKey="quick.brown.fox”
、routingKey="orange”
、routingKey="quick.orange.male.rabbit”
的消息将会被丢弃,因为它们没有匹配任何bindingKey
。
2. 代码部分
生产者
package cn.lovingliu.rabbitmq_topic.producer;
import cn.lovingliu.common.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author:LovingLiu
* @Description: 主题类型的消息生产者
* @Date:Created in 2020-01-16
*/
public class Producer {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
/** 1.创建新的连接 */
Connection connection = ConnectionUtil.getConnection();
/** 2.创建通道 */
Channel channel = connection.createChannel();
/** 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 */
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
/** 4.发送消息 */
String routingKey = "log.info.error";
String msg = "topic_exchange_msg:" + routingKey;
System.out.println("[send] = " + msg);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
/** 5.关闭通道、连接 */
channel.close();
connection.close();
/** 注意:如果消费没有绑定交换机和队列,则消息会丢失 */
}
}
消费者
* 消费者
package cn.lovingliu.rabbitmq_topic.consumer;
import cn.lovingliu.common.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author:LovingLiu
* @Description: * 消费者
* @Date:Created in 2020-01-16
*/
public class ConsumerLogXTopic {
private static final String QUEUE_NAME = "topic_consumer_info";
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("log * 消费者启动");
/** 1.创建新的连接 */
Connection connection = ConnectionUtil.getConnection();
/** 2.创建通道 */
Channel channel = connection.createChannel();
/** 3.消费者关联队列 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取生产者消息:" + msg);
}
};
/** 5.消费者监听队列消息 */
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
# 消费者
package cn.lovingliu.rabbitmq_topic.consumer;
import cn.lovingliu.common.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author:LovingLiu
* @Description:
* @Date:Created in 2020-01-16
*/
public class ConsumerLogJTopic {
private static final String QUEUE_NAME = "topic_consumer_info";
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("log # 消费者启动");
/** 1.创建新的连接 */
Connection connection = ConnectionUtil.getConnection();
/** 2.创建通道 */
Channel channel = connection.createChannel();
/** 3.消费者关联队列 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取生产者消息:" + msg);
}
};
/** 5.消费者监听队列消息 */
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3.运行截图
生产者
*消费者
#消费者
4.总结
1、topic
相对于之前几种算是比较复杂了,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey
),exchange
会将消息转发到所有关注主题能与 routeKey
模糊匹配的队列。
2、在进行绑定时,要提供一个该队列关心的主题,如“#.sscai.#”
表示该队列关心所有涉及 sscai
的消息(一个 routeKey
为 "club.sscai.tmax"
的消息会被转发到该队列)。
3、"#"
表示0个或若干个关键字,“”
表示一个关键字。如“club.”
能与“club.sscai”
匹配,无法与“club.sscai.xxx”
匹配;但是“club.#”
能与上述两者匹配。
4、同样,如果 exchange
没有发现能够与 routeKey
匹配的 Queue
,则会抛弃此消息。