Destination高级特性
长恨人生不如水,等闲平地起波澜。 —— 刘禹锡
1. Wildcards(通配符)
Wildcars用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。
ActiveMQ支持以下三种通配符:
- ".":用于作为路径上名字间的分隔符
- ">":用于递归的匹配任何以这个名字开始的Destination(目的地)
- "*":用于作为路径上任何名字。
举例来说,如有以下两个Destination:
PRICE.COMPUTER.JD.APPLE(苹果电脑在京东上的价格)
PRICE.COMPUTER.TMALL.APPLE(苹果电脑在天猫上的价格)
1. PRICE.> :匹配任何产品的价格变动
2. PRICE.COMPUTER.> :匹配任何电脑产品的价格变动
3. PRICE.COMPUTER.JD.*:匹配任何在京东上的电脑的价格变动
4. PRICE.COMPUTER.*.APPLE:匹配苹果电脑京东或天猫上的价格变动
JAVA代码示例:
生产者:
// 实例化连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false");
// 通过连接工厂获取连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
// 创建队列
Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE");
// 创建生产者
MessageProducer messageProducer = session.createProducer(destination);
for (int i = 1; i <= 10; i++) {
TextMessage textMessage = session.createTextMessage(message);
messageProducer.send("Mac Air价格:" + i * 1000);
System.out.println("发送消息 - " + textMessage.getText());
}
session.commit();
消费者:
// 实例化连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false");
// 通过连接工厂获取连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
// 创建队列
Destination destination = session.createQueue("PRICE.COMPUTER.>");
// 创建消费者
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener(){
@Override
public void onMessage(Message message) {
try {
System.out.println("收到的消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
生产者发送消息日志:
发送消息:Mac Air价格:1000
发送消息:Mac Air价格:2000
发送消息:Mac Air价格:3000
发送消息:Mac Air价格:4000
发送消息:Mac Air价格:5000
发送消息:Mac Air价格:6000
发送消息:Mac Air价格:7000
发送消息:Mac Air价格:8000
发送消息:Mac Air价格:9000
发送消息:Mac Air价格:10000
消费者接收到的消息日志:
收到的消息:Mac Air价格:1000
收到的消息:Mac Air价格:2000
收到的消息:Mac Air价格:3000
收到的消息:Mac Air价格:4000
收到的消息:Mac Air价格:5000
收到的消息:Mac Air价格:6000
收到的消息:Mac Air价格:7000
收到的消息:Mac Air价格:8000
收到的消息:Mac Air价格:9000
收到的消息:Mac Air价格:10000
从消费者的接收日志来看,当消费者使用通配符队列时,是能正常接收消息的。
通配符中是为消费者服务的。即:通配符只能配置在消费端。
2. 组合列队(Composite Destinations)
组合列队即通过一个虚拟的Destination代表多个Destination,这样就可以通过该虚拟的Destination同时向多个Destination发送消息。
实现虚拟Destination可通过以下方式实现:
(1) 客户端方式
生产者:
Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE,PRICE.COMPUTER.TMALL.APPLE");
// 多个Destination使用,分隔
消费者:
Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE");
// 或者
Destination destination = session.createQueue("PRICE.COMPUTER.TMALL.APPLE");
// 以上两个消费者都能接收到生产者的消息
若使用不同类型的destination,那么需要加上前缀如queue:// 或topic://。如:Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE,topic://PRICE.COMPUTER.TMALL.APPLE");
(2) 配置方式
在activemq.xml的broker中添加以下配置:
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="PRICE.COMPUTER">
<forwardTo>
<queue physicalName="PRICE.COMPUTER.JD.APPLE"/>
<queue physicalName="PRICE.COMPUTER.TMALL.APPLE" />
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
然后在生产者中,直接向队列PRICE.COMPUTER中发送消息。那么监听PRICE.COMPUTER.JD.APPLE或PRICE.COMPUTER.TMALL.APPLE的消费者都能同时接收到生产者发送的消息。
注意:修改完配置文件后,须重启Broker服务,不然配置是不会生效的!
3. 配置启动队列
若需要在ActiveMQ启动的时候,创建Destination的话,可以如下配置conf/activemq.xml的broker下:
<destinations>
<queue physicalName="PRICE.COMPUTER.JD.APPLE" />
<topic physicalName="PRICE.COMPUTER.TMALL.APPLE" />
</destinations>
4. 队列选项
队列选项是给consumer在JMS规范之外添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。包括:
1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable。
2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认0。
3:consumer.noLocal :默认false。
4:consumer.dispatchAsync :是否异步分发 ,默认true。
5:consumer.retroactive:是否为回溯消费者 ,默认false。
6:consumer.selector:Jms的Selector,默认null。
7:consumer.exclusive:是否为独占消费者 ,默认false。
8:consumer.priority:设置消费者的优先级,默认0。
示例如下:
queue = new ActiveMQQueue("PRICE.COMPUTER.TMALL.APPLE?consumer.dispatchAsync=true&consumer.prefetchSize=20");
consumer = session.createConsumer(queue);
5. 虚拟Destination
ActiveMQ支持虚拟Destination有以下两种方式:
(1) 虚拟主题
(2) 组合Destinations(前面已介绍过)
5.1 为何使用虚拟主题
ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
(1) 同一应用内consumer端负载均衡的问题:即同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。
(2) 由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。
在ActiveMQ,可以通过虚拟主题来解决以上两个问题。
5.2 虚拟主题的使用
对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.Mobile。示例:
Topic destination = session.createTopic("VirtualTopic.Mobille");
对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。如Consumer.A.VirtualTopic.Mobille,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.Mobille说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此Topic,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。代码示例如下:
应用A的消费者:
Destination destination = session.createQueue("Consumer.A.VirtualTopic.Mobile");
应用A的消费者启动两个及以上,下面称为A1、A2.
应用B的消费者:
Destination destination = session.createQueue("Consumer.B.VirtualTopic.Mobile");
应用A的消费者和应用B的消费者依次启动订阅,发布者发送消息。
应用A消费者A1消费消息日志:
收到的消息:ActiveMQ 发送的消息:10
收到的消息:ActiveMQ 发送的消息:12
收到的消息:ActiveMQ 发送的消息:14
收到的消息:ActiveMQ 发送的消息:16
收到的消息:ActiveMQ 发送的消息:18
应用A消费者A2消费消息日志:
收到的消息:ActiveMQ 发送的消息:11
收到的消息:ActiveMQ 发送的消息:13
收到的消息:ActiveMQ 发送的消息:15
收到的消息:ActiveMQ 发送的消息:17
收到的消息:ActiveMQ 发送的消息:19
应用B消费者消息日志:
收到的消息:ActiveMQ 发送的消息:10
收到的消息:ActiveMQ 发送的消息:11
收到的消息:ActiveMQ 发送的消息:12
收到的消息:ActiveMQ 发送的消息:13
收到的消息:ActiveMQ 发送的消息:14
收到的消息:ActiveMQ 发送的消息:15
收到的消息:ActiveMQ 发送的消息:16
收到的消息:ActiveMQ 发送的消息:17
收到的消息:ActiveMQ 发送的消息:18
收到的消息:ActiveMQ 发送的消息:19
从以上消费者日志可以看出,应用A的A1、A2两个消费者共同消费了10条消息,而应用B的消费者也同样消费了10条消息,但应用B只要一个消费者。
注意:若是activemq.xml中配置了destinationInterceptors,则需要将destinationInterceptors删除,然后重启Broker服务。
虚拟主题的前缀默认是VirtualTopic,若是需要修改,可通过以下方式修改:
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>