ActiveMQ之Consumer
1 Exclusive Consume(独占消费)
默认Queue中的消息是按照顺序被分发到consumer的,然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消息是被多个线程并发的处理。 ActiveMQ从4.x版本开始支持Exclusive Consumer。Broker会从多个Consumers中挑选一个consumer来处理queue中所有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker会自动切换到其他的consumer。可以通过destination options来创建一个Exclusive Consumer,如下:
private static final String queueName = "myQueue?consumer.exclusive=true";
说明:
这个会独占这个队列频道,所有的消息都将发发到这个连接上。在多线程环境下仍然是就这一个连接可以获取到消息。
2 Consumer Dispatche Async(消息异步分发)
在activemq4.0以后,你可以选择broker同步或异步的把消息分发给消费者。可以设置dispatchAsync属性,默认是true,通常情况下这是最佳的。 你也可以通过如下几种方式修改:
- 在ConnectionFactory层设置
ActiveMQConnectionFactory.setDispatchAsync(false);
- 在Connection上设置,这个设置将会覆盖ConnectionFactory上的设置
ActiveMQConnetion.setDispatchAsync(false);
- 在Consumer上设置
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");
consumer = session.createConsumer(queue);
关闭异步分发
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616" disableAsyncDispatch="true"/>
3 Consumer Priority(消息优先级)
当多个Consumer消费同一个队列的时候,可以为Consumer设置优先级来消费。
Consumer的Priority的划分为”0~127”个级别,127是最高的级别,0是最低的也是ActiveMQ默认的。这种配置可以让Broker根据consumer的优先级来发送消息到较高的优先级的Consumer上,如果某个较高的Consumer的消息消费慢,则Broker会把消息发送到仅次于它优先级的Consumer上。
queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
consumer = session.createConsumer(queue);
4 Manage Durable Subscribers(管理持久订阅)
持久订阅,保证了消费者离线之后,再次进入系统,不会错过消息,但是这也会造成消息的堆积。可以通过设置消息的过期时间来控制,然后定期检查过期消息并删除:
<policyEntry topic=">" expireMessagesPeriod="300000"/>
从5.6开始,可以对不活跃的持久化订阅进行清除。如下:
<broker name="localhost"
offlineDurableSubscriberTimeout="86400000"
offlineDurableSubscriberTaskSchedule="3600000">
说明:
offlineDurableSubscriberTimeout:离线多长时间就过期删除,缺省是-1,就是不删除。 offlineDurableSubscriberTaskSchedule: 多长时间检查一次,缺省300000,单位毫秒。
5 Message Groups(消息分组)
Message Goups就是对消息分组,它是Exclusive Consumer功能的增强。逻辑上Message Groups可以看成是一种并发的 Exclusive Consumer。跟所有的消息都由唯一的consumer处理不同,JMS消息属性的JMSXGroupID用来区分message group。Message Group特性保证所有具有相同 JMSXGroupID 的消息都会被分发到相同的consumer(只要这个consumer保持active) 。
另一方面,Message Groups 特性也是一种负载均衡的机制。在一个消息被分发到consumer之前,broker首先检查消息 JMSXGroupID 属性。如果存在,那么broker会检查是否有某个consumer拥有这个message group.如果没有,那么broker会选择一个consumer,并将它关联到这个message group.此后,这个consumer会接收到这个message group的所有消息,直到:
-
consumer被关闭。
-
Message group被关闭(通过发送一个消息,并设置这个消息的 JMSXGroupSeq 为-1)。
创建一个Message Groups,只需要在message对象上设置属性即可,如下:
message.setStringProperty("JMSXGroupID","GroupA");
关闭一个Message Groups,只需要在message对象上设置属性即可,如下:
message.setStringProperty("JMSXGroupID","GroupA");
message.setIntProperty("JMSXGroupSeq",-1);
6 Message Selectors(消息选择器)
JMS Selectors 用在获取消息的时候,可以基于消息属性和 Xpath 语法对消息进行过滤。JMS Selectors有SQL92语义定义。以下是个Selectors的例子:
//创建消费者的时候指定 Selector
MessageConsumer consumer = session.createConsumer(destination, "JMSXGroupID='GroupA'");
注意:
- JMS Selectors表达式中,可以使用IN, NOT IN, LIKE等。
- 需要注意的是,JMS Selectors 表达式中的日期和时间需要使用标准的Long型毫秒值。
- 表达式中的属性不会自动进行类型转换。
- Message Groups虽然可以保证具有相同的message group的消息会被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。
7 Redelivery Policy(重传策略)
当Consumer在接收到消息时,由于以下原因没有返回Ack信息,就会导致消息的重新投递。
- Client用了transactions,且在Session中调用了rollback()。
- Client用了transactions,且在调用commit()之前关闭。
- Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()。
- Client在超时回复Ack。
可以通过设置ActiveMQConnectionFactory和ActiveMQConnection来定制想要的重传策略,可用的Redelivery属性如下:
| property | default value | description |
|---|---|---|
| collisionAvoidanceFactor | 0.15 | 设置防止冲突范围的正负百分比,只有启用了useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。 |
| maximumRedeliveries | 6 | 最大重传次数,达到最大重传次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。 |
| maximumRedeliveryDelay | -1 | 传送延迟,旨在useExpoentialBackOff为true时有效(5.5之后),假设首次重间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔大于最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 |
| initialRedeliveryDelay | 1000L | 初始重发延迟时间。 |
| redeliveryDelay | 1000L | 重发延迟时间,当initialRedeliveryDelay=0时生效。 |
| useCollisionAvoidance | false | 启用防止冲突功能。 |
| useExponentialBackOff | false | 启用指数倍数递增的方式增加延迟时间。 |
| backOffMultiplier | 5 | 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。 |
示例
// 1创建ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(3);// 重传次数
policy.setInitialRedeliveryDelay(2 * 1000);
connectionFactory.setRedeliveryPolicy(policy);
8 Slow Consumer Handling(慢消费处理)
8.1 Prefetch机制
ActiveMQ通过Prefetch机制来提供性能,在客户端得内存里缓存一定数量得消息。缓存消息得数量由prefetch limit来控制。当某个 consumer 的 prefetch buffer 已经达到上限或者消息缓冲区满了,那么broker不会再向consumer分发消息,直到consumer像broker发送消息的确认,确认后的消息将会从缓存中去掉。ActiveMQ默认预取数量如下:
| 转发模式 | 队列类型 | prefetchSize |
|---|---|---|
| PERSISTENT | Queue | 1000 |
| NON_PERSISTENT | Queue | 1000 |
| PERSISTENT | Topic | 100 |
| NON_PERSISTENT | Topic | 32766 |
注意:
如果预取数量设置为1,将会导致消息一条一条的推送。如果预取数量被设置为0,将会导致关闭Broker的推送功能,需要消费者主动拉取数据。
prefetchk可以通过如下方式 设置:
- 通过URL
tcp://localhost:61616?jms.prefetchPolicy.all=50
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
或
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
- 通过ActiveMQPrefetchPolicy策略对象修改
ActiveMQPrefetchPolicy prefetchPolicy = connectionFactory.getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(200);
- 通过Properties属性设置
Properties properties = connectionFactory.getProperties();
properties.setProperty("prefetchPolicy.queuePrefetch","1000");
8.2 慢消费处理
生产者在发送消息,Broker在将消息转发给消费者时,如果发现其内部有大量消息没有消费完成,那么Broker就会认为该消费者是慢消费者。在队列模式下,如果已发送,但没有确认的消息数量大于 prefetchSize,则消费者会被标记为 Slow。在主题模式下,如果cacheLimit已满,但是向主题的订阅者要发送的消息大于 prefetchSize,那么订阅者将被标记为 Slow。
由于慢消费者会导致消息堆积,消耗内存,从而导致内存数据和磁盘文件不停交换,消耗磁盘IO。同时还会影响生产者生成消息的速率。ActiveMQ使用等待消息限制策略(Pending Message Limit Strategy)来解决这个问题,当超过这个上限后有新消息到来时将根据不同的策略抛弃。
- 等待消息限制
目前等待消息限制策略有以下两种:
- Constant Pending Message Limit Strategy
limit 可以设置0, > 0, -1三种方式:0表示:不额外的增加其预存大小,> 0表示:在额外的增加其预存大小,-1表示:不增加预存也不丢弃旧的消息,这个策略使用常量限制,配置如下:
<constantPendingMessageLimitStrategy limit="50"/>
- Prefetch Rate Pending Message LimitStrategy
这种策略是利用Consumer的之前的预存的大小乘以其倍数等于现在的预存大小。比如:
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
注意:
在以上两种方式中,如果设置了0,意味着除了prefetch之外不再缓存消息,如果设置了-1意味着禁止丢弃消息。
- 消息丢弃
目前消息丢弃策略有三种:
-
oldestMessageEvictionStrategy:这个策略丢弃最旧的消息。
-
oldestMessageWithLowestPriorityEvictionStrategy: 这个策略丢弃最旧的,而且具有最低优先级的消息。
-
uniquePropertyMessageEvictionStrategy:从5.6开始,可以根据自定义的属性来进行抛弃。
<uniquePropertyMessageEvictionStrategy propertyName="STOCK"/>表示要抛弃属性名称为Stock的消息。
示例:
<policyEntry topic="PRICES.>">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10"/>
</pendingMessageLimitStrategy>
<!-- 10 seconds worth -->
<messageEvictionStrategy>
<uniquePropertyMessageEvictionStrategy propertyName="STOCK"/>
</messageEvictionStrategy>
</policyEntry>
9 Subscription Recovery Policy(订阅恢复策略)
生产者在某个topic发送了多条消息后,这个时候非持久订阅者才订阅,那么它是不能获取之前生产者发送的信息的。或者,由于网络问题,非持久类型的消费者处于非活跃状态,无法接收到生产者发送的消息。使用消息恢复策略,可以解决上面的问题。ActiveMQ目前支持一个定时或固定大小的恢复缓冲区,在你连接到broker后,在一段时间内的消息会重新发送给订阅者。
| Policy Name | Sample Configuration | Description |
|---|---|---|
| FixedSizedSubscriptionRecoveryPolicy | <fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/> |
保留固定字节的消息。 |
| FixedCountSubscriptionRecoveryPolicy | <fixedCountSubscriptionRecoveryPolicy maximumSize="100"/> |
保留固定数量的消息。 |
| LastImageSubscriptionRecoveryPolicy | <lastImageSubscriptionRecoveryPolicy/> |
保留最后一条记录。 |
| NoSubscriptionRecoveryPolicy | <noSubscriptionRecoveryPolicy/> |
禁用回溯,这是默认配置。 |
| QueryBasedSubscriptionRecoveryPolicy | <queryBasedSubscriptionRecoveryPolicy query="JMSType = 'car' AND color = 'blue'"/> |
根据查询机制使用回溯。 |
| TimedSubscriptionRecoveryPolicy | <timedSubscriptionRecoveryPolicy recoverDuration="60000" /> |
保留指定时间内的消息。 |
| RetainedMessageSubscriptionRecoveryPolicy | <retainedMessageSubscriptionRecoveryPolicy/> |
保留ActiveMQ.Retain属性值为true的最后1条消息。 |
示例:
<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="60000"/>
</subscriptionRecoveryPolicy>
</policyEntry>
注意:
需要设置retroactive属性为true。如下:
Topic topic = session.createTopic("TEST.TOPIC?consumer.retroactive=true");
MessageConsumer consumer = session.createConsumer(topic);
JMS学习(八)-ActiveMQ Consumer 使用 push 还是 pull 获取消息 - hapjin - 博客园
ActiveMQ