ActiveMQ的高级特性
1 异步投递与确认签收回调
同步发送:会阻塞producer的send方法。
异步发送:不会阻塞producer的send方法。
消息的发送默认情况下是采用异步发送,除下面两种情况:
-
设置消息发送成同步发送。
-
发送消息是持久化消息。
消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。但是当发送方法在一个事务上下文中时,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味着所有的持久消息都以被写到二级存储中。
设置异步发送的方式:
-
在brokerURI添加
jms.alwaysSyncSend=false&jms.useAsyncSend=true
。 -
通过ConnectionFactory接口的
setUseAsyncSend
方法设置。 -
通过Connection接口的
setUseAsyncSend
方法设置。
注意:
-
如果设置了alwaysSyncSend=true系统将会忽略useAsyncSend设置的值都采用同步。
-
当alwaysSyncSend=false,useAsyncSend=false时;NON_PERSISTENT(非持久化)消息和事务中的消息将使用异步发送。
-
当alwaysSyncSend=false时,如果指定了useAsyncSend=true;PERSISTENT(持久化)类型的消息使用异步发送。如果useAsyncSend=false,PERSISTENT类型的消息使用同步发送。
总结: 默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送。
jms.sendTimeout
:发送超时时间,默认等于0,如果jms.sendTimeout>0将会忽略(alwaysSyncSend、useAsyncSend、消息是否持久化)所有的消息都是用同步发送。
producerWindowSize
:窗口尺寸,用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的尺寸,且只对异步发送有意义。每次发送消息之后,都将会导致memoryUsage尺寸增加(+message.size),当broker返回producerAck时,如果达到了producerWindowSize上限,即使是异步调用也会被阻塞,防止不停向broker发送消息。
通过ActiveMQConnectionFactory或者ActiveMQConnection的setProducerWindowSize
来设置。
2 延迟投递与定时投递
ActiveMQ对消息延时和定时投递做了很好的支持,其内部启动Scheduled来对该功能支持,也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,只需要把几个描述消息定时调度方式的参数作为属性添加到消息,broker端的调度器就会按照我们想要的行为去处理消息
先在 activemq.xml 中配置 schedulerSupport 属性为 true
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${ activemq. data}" schedulerSupport="true" />
通过在Message设置如下属性可以进行延时投递和定时投递

实例如下:
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayTime);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, periodTime);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeatTimes);
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cron);
3 消费重试机制
消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。消息重发的情况有以下几种:
-
事务会话中,当还未进行session.commit()时,进行session.rollback(),那么所有还没commit的消息都会进行重发。
-
使用客户端手动确认的方式时,还未进行确认并且执行Session.recover(),那么所有还没acknowledge的消息都会进行重发。
-
所有未ack的消息,当进行session.closed()关闭事务,那么所有还没ack的消息broker端都会进行重发,而且是马上重发。
-
消息被消费者拉取之后,超时没有响应ack,消息会被broke重发。
在默认情况下,当消息签收失败时ActiveMQ消息服务器会继续每隔1秒钟向消费者端发送一次这个签收失败的消息,默认会尝试6次(加上正常的1次共7次),如果这7次消费者端全部签收失败,则会给ActiveMQ服务器发送一个“poison ack”,表示这个消息不正常(“有毒”),这时消息服务器不会继续传送这个消息给这个消费者,而是将这个消息放入死信队列(DLQ,即Dead Letter Queue)。
3.1 消息重试属性
消费重试机制的默认相关配置如下:
可用的Redelivery属性如下:
property | default value | description |
---|---|---|
collisionAvoidanceFactor |
0.15 |
设置防止冲突范围的正负百分比,只有启用了useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。 |
maximumRedeliveries |
6 |
最大重传次数,达到最大重传次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。 |
maximumRedeliveryDelay |
-1 |
传送延迟,旨在useExpoentialBackOff为true时有效(5.5之后),假设首次重间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔大于最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 |
initialRedeliveryDelay |
1000L |
初始重发延迟时间 |
redeliveryDelay |
1000L |
重发延迟时间,当initialRedeliveryDelay=0时生效。 |
useCollisionAvoidance |
false |
启用防止冲突功能。 |
useExponentialBackOff |
false |
启用指数倍数递增的方式增加延迟时间。 |
backOffMultiplier |
5 |
重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。 |
3.2 消息重试配置
消息重试有三种配置:url,xml,connection连接属性。
URL
tcp://192.168.0.15:61616?jms.redeliveryPolicy.initialRedeliveryDelay=0&jms.redeliveryPolicy.redeliveryDelay=1000
XML
<broker schedulerSupport="true">
<plugins>
<redeliveryPlugin fallbackToDeadLetter="true"
sendToDlqIfMaxRetriesExceeded="true">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<redeliveryPolicyEntries>
<!-- a destination specific policy -->
<redeliveryPolicy queue="SpecialQueue"
maximumRedeliveries="4"
redeliveryDelay="10000"/>
</redeliveryPolicyEntries>
<defaultEntry>
<!-- the fallback policy for all other destinations -->
<redeliveryPolicy maximumRedeliveries="4"
initialRedeliveryDelay="5000"
redeliveryDelay="10000"/>
</defaultEntry>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
</broker>
必须开启 schedulerSupport 属性。
connection
//重发策略
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(2);
//创建队列(有则不创建)
Destination destination = session.createQueue("garine-queue");
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
map.put((ActiveMQDestination) destination, queuePolicy);
4 死信队列
ActiveMQ中引入了“死信队列”(Dead Letter Queue)的概念,在一条消息被重复发送给消息消费者端多次(默认为6次)后,若一直签收不成功,则ActiveMQ会将这条消息移入到“死信队列”。开发时可以开启一个后台线程监听这个队列(默认死信队列的名称为ActiveMQ.DLQ)中的消息,进行人工干预,也就是说死信队列的作用主要是处理签收失败的消息。
死信队列的配置主要有两种:SharedDeadLetterStrategy和IndividualDeadLetterStrategy
- SharedDeadLetterStrategy:共享的死信队列配置策略,将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ Broker端的默认策略。共享队列的名称默认为“ActiveMQ.DLQ”,可以通过“deadLetterQueue”属性来设定:在activemq.xml中的<policyentries>节点中配置</policyentries>
<deadLetterStrategy>
<sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>
- IndividualDeadLetterStrategy:单独的死信队列配置策略,把DeadLetter放入各自的死信通道中。对于Queue而言,死信通道的前缀默认为“ActiveMQ.DLQ.Queue”;对于Topic而言,死信通道的前缀默认为“ActiveMQ.DLQ.Topic”。比如队列Order,那么它对应的死信通道为“ActiveMQ.DLQ.Queue.Order”。我们可以使用queuePrefix和topicPrefix来指定上述前缀:
<!-- 仅对与order队列起作用 -->
<policyEntry queue="order">
<deadLetterStrategy> <!-- useQueueForQueueMessage属性的作用:是否将名为order的Topic中的DeadLetter也保存在该队列中,默认为true -->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessage="false"/>
</deadLetterStrategy>
</policyEntry>
注意:
默认情况下,无论是Topic还是Queue,Broker都使用Queue来保存DeadLetter,即死信通道通常为Queue,不过开发时也可以指定为Topic
4.1 自动删除过期消息
自动删除过期消息,此时对于过期的消息将不会被放入到死信队列,而是自动删除,>表示对所有队列起作用,processExpired表示是否将过期消息放入死信队列,默认为true
<!-- >表示对所有队列起作用 -->
<policyEntry queue=">">
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false"/>
</deadLetterStrategy>
</policyEntry>
4.2 存放非持久消息到死队列中
将签收失败的非持久消息也放入到死信队列,默认情况下,ActiveMQ不会把非持久化的死消息放入死信队列,processNonPersistent表示是否将非持久化消息放入死信队列,默认为false
<!-- >表示对所有队列起作用 -->
<policyEntry queue=">">
<deadLetterStrategy>
<sharedDeadLetterStrategy processNonPersistent="true"/>
</deadLetterStrategy>
</policyEntry>
对于过期的,可以通过processExpired属性来控制,对于redelivered的失败的消息,需要通过插件来实现如下:丢弃所有死信 。
<broker>
<plugins>
<discardingDLQBrokerPlugin dropAll="true"
dropTemporaryTopics="true" dropTemporaryQueues="true"/>
</plugins>
</broker>
5 防止消息重复消费,幂等性
ActiveMQ中的消息有时是会被重复消费的,而我们消费消息时大都会在拿到消息后去调用其他的方法,比如说将消息的内容解析为一个对象保存到数据库中。一旦发生消息的重复消费时就会重复保存,这是有问题的,因此我们需要考虑如何防止重复调用。其实我们是没有办法防止重复调用的,只能在重复调用时进行消息是否重复消费的校验,当然对于幂等性接口也可以不进行校验。 那如何进行校验呢?有很多种方式,比如说我们将消费过的消息的messageId保存到数据库,每次消费消息前先到数据库中查一下该消息是否已被消费。在分布式系统中,也可以将消费过的消息放入redis中,以messageId作为key,message对象作为value(其实value不重要,当然也要看需求本身),在消费消息时先从redis中查找该消息是否已被消费。