ActiveMQ之Consumer

2021-01-05  本文已影响0人  爱健身的兔子

1 Exclusive Consume(独占消费)

默认Queue中的消息是按照顺序被分发到consumer的,然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消息是被多个线程并发的处理。 ActiveMQ从4.x版本开始支持Exclusive Consumer。Broker会从多个Consumers中挑选一个consumer来处理queue中所有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker会自动切换到其他的consumer。可以通过destination options来创建一个Exclusive Consumer,如下:

private static final String queueName = "myQueue?consumer.exclusive=true";

说明

这个会独占这个队列频道,所有的消息都将发发到这个连接上。在多线程环境下仍然是就这一个连接可以获取到消息。

2 Consumer Dispatche Async(消息异步分发)

在activemq4.0以后,你可以选择broker同步或异步的把消息分发给消费者。可以设置dispatchAsync属性,默认是true,通常情况下这是最佳的。 你也可以通过如下几种方式修改:

  1. 在ConnectionFactory层设置
ActiveMQConnectionFactory.setDispatchAsync(false);
  1. 在Connection上设置,这个设置将会覆盖ConnectionFactory上的设置
ActiveMQConnetion.setDispatchAsync(false);
  1. 在Consumer上设置
  queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");
  consumer = session.createConsumer(queue);

关闭异步分发

<transportConnector name="openwire" uri="tcp://0.0.0.0:61616" disableAsyncDispatch="true"/>

3 Consumer Priority(消息优先级)

当多个Consumer消费同一个队列的时候,可以为Consumer设置优先级来消费。

Consumer的Priority的划分为”0~127”个级别,127是最高的级别,0是最低的也是ActiveMQ默认的。这种配置可以让Broker根据consumer的优先级来发送消息到较高的优先级的Consumer上,如果某个较高的Consumer的消息消费慢,则Broker会把消息发送到仅次于它优先级的Consumer上。

queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
consumer = session.createConsumer(queue);

4 Manage Durable Subscribers(管理持久订阅)

持久订阅,保证了消费者离线之后,再次进入系统,不会错过消息,但是这也会造成消息的堆积。可以通过设置消息的过期时间来控制,然后定期检查过期消息并删除:

<policyEntry topic=">" expireMessagesPeriod="300000"/>

从5.6开始,可以对不活跃的持久化订阅进行清除。如下:

<broker name="localhost" 
        offlineDurableSubscriberTimeout="86400000" 
            offlineDurableSubscriberTaskSchedule="3600000">

说明

offlineDurableSubscriberTimeout:离线多长时间就过期删除,缺省是-1,就是不删除。 offlineDurableSubscriberTaskSchedule: 多长时间检查一次,缺省300000,单位毫秒。

5 Message Groups(消息分组)

Message Goups就是对消息分组,它是Exclusive Consumer功能的增强。逻辑上Message Groups可以看成是一种并发的 Exclusive Consumer。跟所有的消息都由唯一的consumer处理不同,JMS消息属性的JMSXGroupID用来区分message group。Message Group特性保证所有具有相同 JMSXGroupID 的消息都会被分发到相同的consumer(只要这个consumer保持active)
另一方面,Message Groups 特性也是一种负载均衡的机制。在一个消息被分发到consumer之前,broker首先检查消息 JMSXGroupID 属性。如果存在,那么broker会检查是否有某个consumer拥有这个message group.如果没有,那么broker会选择一个consumer,并将它关联到这个message group.此后,这个consumer会接收到这个message group的所有消息,直到:

创建一个Message Groups,只需要在message对象上设置属性即可,如下:

message.setStringProperty("JMSXGroupID","GroupA");

关闭一个Message Groups,只需要在message对象上设置属性即可,如下:

 message.setStringProperty("JMSXGroupID","GroupA");
 message.setIntProperty("JMSXGroupSeq",-1);

6 Message Selectors(消息选择器)

JMS Selectors 用在获取消息的时候,可以基于消息属性和 Xpath 语法对消息进行过滤。JMS Selectors有SQL92语义定义。以下是个Selectors的例子:

//创建消费者的时候指定 Selector
MessageConsumer consumer = session.createConsumer(destination, "JMSXGroupID='GroupA'");

注意

  1. JMS Selectors表达式中,可以使用IN, NOT IN, LIKE等。
  2. 需要注意的是,JMS Selectors 表达式中的日期和时间需要使用标准的Long型毫秒值。
  3. 表达式中的属性不会自动进行类型转换。
  4. Message Groups虽然可以保证具有相同的message group的消息会被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。

7 Redelivery Policy(重传策略)

当Consumer在接收到消息时,由于以下原因没有返回Ack信息,就会导致消息的重新投递。

  1. Client用了transactions,且在Session中调用了rollback()。
  2. Client用了transactions,且在调用commit()之前关闭。
  3. Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()。
  4. Client在超时回复Ack。

可以通过设置ActiveMQConnectionFactory和ActiveMQConnection来定制想要的重传策略,可用的Redelivery属性如下:

property default value description
collisionAvoidanceFactor 0.15 设置防止冲突范围的正负百分比,只有启用了useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。
maximumRedeliveries 6 最大重传次数,达到最大重传次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。
maximumRedeliveryDelay -1 传送延迟,旨在useExpoentialBackOff为true时有效(5.5之后),假设首次重间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔大于最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
initialRedeliveryDelay 1000L 初始重发延迟时间。
redeliveryDelay 1000L 重发延迟时间,当initialRedeliveryDelay=0时生效。
useCollisionAvoidance false 启用防止冲突功能。
useExponentialBackOff false 启用指数倍数递增的方式增加延迟时间。
backOffMultiplier 5 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。

示例

// 1创建ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(3);// 重传次数
policy.setInitialRedeliveryDelay(2 * 1000);
connectionFactory.setRedeliveryPolicy(policy);

8 Slow Consumer Handling(慢消费处理)

8.1 Prefetch机制

ActiveMQ通过Prefetch机制来提供性能,在客户端得内存里缓存一定数量得消息。缓存消息得数量由prefetch limit来控制。当某个 consumer 的 prefetch buffer 已经达到上限或者消息缓冲区满了,那么broker不会再向consumer分发消息,直到consumer像broker发送消息的确认,确认后的消息将会从缓存中去掉。ActiveMQ默认预取数量如下:

转发模式 队列类型 prefetchSize
PERSISTENT Queue 1000
NON_PERSISTENT Queue 1000
PERSISTENT Topic 100
NON_PERSISTENT Topic 32766

注意

如果预取数量设置为1,将会导致消息一条一条的推送。如果预取数量被设置为0,将会导致关闭Broker的推送功能,需要消费者主动拉取数据。

prefetchk可以通过如下方式 设置:

  1. 通过URL
tcp://localhost:61616?jms.prefetchPolicy.all=50
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
或
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
  1. 通过ActiveMQPrefetchPolicy策略对象修改
ActiveMQPrefetchPolicy prefetchPolicy = connectionFactory.getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(200);
  1. 通过Properties属性设置
Properties properties = connectionFactory.getProperties();
properties.setProperty("prefetchPolicy.queuePrefetch","1000");
8.2 慢消费处理

生产者在发送消息,Broker在将消息转发给消费者时,如果发现其内部有大量消息没有消费完成,那么Broker就会认为该消费者是慢消费者。在队列模式下,如果已发送,但没有确认的消息数量大于 prefetchSize,则消费者会被标记为 Slow。在主题模式下,如果cacheLimit已满,但是向主题的订阅者要发送的消息大于 prefetchSize,那么订阅者将被标记为 Slow。

由于慢消费者会导致消息堆积,消耗内存,从而导致内存数据和磁盘文件不停交换,消耗磁盘IO。同时还会影响生产者生成消息的速率。ActiveMQ使用等待消息限制策略(Pending Message Limit Strategy)来解决这个问题,当超过这个上限后有新消息到来时将根据不同的策略抛弃。

  1. 等待消息限制

目前等待消息限制策略有以下两种:

limit 可以设置0, > 0, -1三种方式:0表示:不额外的增加其预存大小,> 0表示:在额外的增加其预存大小,-1表示:不增加预存也不丢弃旧的消息,这个策略使用常量限制,配置如下:

<constantPendingMessageLimitStrategy limit="50"/>

这种策略是利用Consumer的之前的预存的大小乘以其倍数等于现在的预存大小。比如:

 <prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

注意

在以上两种方式中,如果设置了0,意味着除了prefetch之外不再缓存消息,如果设置了-1意味着禁止丢弃消息。

  1. 消息丢弃

目前消息丢弃策略有三种:

示例:

<policyEntry topic="PRICES.>"> 
            <!-- lets force old messages to be discarded for slow consumers -->  
            <pendingMessageLimitStrategy> 
              <constantPendingMessageLimitStrategy limit="10"/> 
            </pendingMessageLimitStrategy>  
            <!-- 10 seconds worth -->  
            <messageEvictionStrategy> 
              <uniquePropertyMessageEvictionStrategy propertyName="STOCK"/> 
            </messageEvictionStrategy> 
</policyEntry>

9 Subscription Recovery Policy(订阅恢复策略)

生产者在某个topic发送了多条消息后,这个时候非持久订阅者才订阅,那么它是不能获取之前生产者发送的信息的。或者,由于网络问题,非持久类型的消费者处于非活跃状态,无法接收到生产者发送的消息。使用消息恢复策略,可以解决上面的问题。ActiveMQ目前支持一个定时或固定大小的恢复缓冲区,在你连接到broker后,在一段时间内的消息会重新发送给订阅者。

Policy Name Sample Configuration Description
FixedSizedSubscriptionRecoveryPolicy <fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/> 保留固定字节的消息。
FixedCountSubscriptionRecoveryPolicy <fixedCountSubscriptionRecoveryPolicy maximumSize="100"/> 保留固定数量的消息。
LastImageSubscriptionRecoveryPolicy <lastImageSubscriptionRecoveryPolicy/> 保留最后一条记录。
NoSubscriptionRecoveryPolicy <noSubscriptionRecoveryPolicy/> 禁用回溯,这是默认配置。
QueryBasedSubscriptionRecoveryPolicy <queryBasedSubscriptionRecoveryPolicy query="JMSType = 'car' AND color = 'blue'"/> 根据查询机制使用回溯。
TimedSubscriptionRecoveryPolicy <timedSubscriptionRecoveryPolicy recoverDuration="60000" /> 保留指定时间内的消息。
RetainedMessageSubscriptionRecoveryPolicy <retainedMessageSubscriptionRecoveryPolicy/> 保留ActiveMQ.Retain属性值为true的最后1条消息。

示例:

<policyEntry topic=">">
    <subscriptionRecoveryPolicy>
        <timedSubscriptionRecoveryPolicy recoverDuration="60000"/>
    </subscriptionRecoveryPolicy>
</policyEntry>

注意

需要设置retroactive属性为true。如下:

Topic topic = session.createTopic("TEST.TOPIC?consumer.retroactive=true");
MessageConsumer consumer = session.createConsumer(topic);

JMS学习(八)-ActiveMQ Consumer 使用 push 还是 pull 获取消息 - hapjin - 博客园
ActiveMQ

上一篇 下一篇

猜你喜欢

热点阅读