ActiveMQ的高级特性

2020-12-31  本文已影响0人  爱健身的兔子

1 异步投递与确认签收回调

同步发送:会阻塞producer的send方法。

异步发送:不会阻塞producer的send方法。

消息的发送默认情况下是采用异步发送,除下面两种情况:

  1. 设置消息发送成同步发送。

  2. 发送消息是持久化消息。

消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。但是当发送方法在一个事务上下文中时,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味着所有的持久消息都以被写到二级存储中。

设置异步发送的方式:

  1. 在brokerURI添加jms.alwaysSyncSend=false&jms.useAsyncSend=true

  2. 通过ConnectionFactory接口的setUseAsyncSend方法设置。

  3. 通过Connection接口的setUseAsyncSend方法设置。

注意:

总结: 默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送。

jms.sendTimeout:发送超时时间,默认等于0,如果jms.sendTimeout>0将会忽略(alwaysSyncSend、useAsyncSend、消息是否持久化)所有的消息都是用同步发送。

producerWindowSize:窗口尺寸,用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的尺寸,且只对异步发送有意义。每次发送消息之后,都将会导致memoryUsage尺寸增加(+message.size),当broker返回producerAck时,如果达到了producerWindowSize上限,即使是异步调用也会被阻塞,防止不停向broker发送消息。

通过ActiveMQConnectionFactory或者ActiveMQConnection的setProducerWindowSize来设置。

2 延迟投递与定时投递

​ActiveMQ对消息延时和定时投递做了很好的支持,其内部启动Scheduled来对该功能支持,也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,只需要把几个描述消息定时调度方式的参数作为属性添加到消息,broker端的调度器就会按照我们想要的行为去处理消息

先在 activemq.xml 中配置 schedulerSupport 属性为 true

 <broker xmlns="http://activemq.apache.org/schema/core"  brokerName="localhost" dataDirectory="${ activemq. data}" schedulerSupport="true" /> 

通过在Message设置如下属性可以进行延时投递和定时投递

实例如下:

 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayTime);
 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, periodTime);
 message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeatTimes);
 message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cron);

3 消费重试机制

消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。消息重发的情况有以下几种:

  1. 事务会话中,当还未进行session.commit()时,进行session.rollback(),那么所有还没commit的消息都会进行重发。

  2. 使用客户端手动确认的方式时,还未进行确认并且执行Session.recover(),那么所有还没acknowledge的消息都会进行重发。

  3. 所有未ack的消息,当进行session.closed()关闭事务,那么所有还没ack的消息broker端都会进行重发,而且是马上重发。

  4. 消息被消费者拉取之后,超时没有响应ack,消息会被broke重发。

在默认情况下,当消息签收失败时ActiveMQ消息服务器会继续每隔1秒钟向消费者端发送一次这个签收失败的消息,默认会尝试6次(加上正常的1次共7次),如果这7次消费者端全部签收失败,则会给ActiveMQ服务器发送一个“poison ack”,表示这个消息不正常(“有毒”),这时消息服务器不会继续传送这个消息给这个消费者,而是将这个消息放入死信队列(DLQ,即Dead Letter Queue)。

3.1 消息重试属性

消费重试机制的默认相关配置如下:
可用的Redelivery属性如下:

property     default value     description
collisionAvoidanceFactor 0.15 设置防止冲突范围的正负百分比,只有启用了useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。
maximumRedeliveries 6 最大重传次数,达到最大重传次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。
maximumRedeliveryDelay -1 传送延迟,旨在useExpoentialBackOff为true时有效(5.5之后),假设首次重间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔大于最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
initialRedeliveryDelay 1000L 初始重发延迟时间
redeliveryDelay 1000L 重发延迟时间,当initialRedeliveryDelay=0时生效。
useCollisionAvoidance false 启用防止冲突功能。
useExponentialBackOff false 启用指数倍数递增的方式增加延迟时间。
backOffMultiplier 5 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。
3.2 消息重试配置

消息重试有三种配置:url,xml,connection连接属性。

URL

tcp://192.168.0.15:61616?jms.redeliveryPolicy.initialRedeliveryDelay=0&jms.redeliveryPolicy.redeliveryDelay=1000

XML

<broker schedulerSupport="true">

        <plugins>
            <redeliveryPlugin fallbackToDeadLetter="true" 
                              sendToDlqIfMaxRetriesExceeded="true">
                <redeliveryPolicyMap>
                    <redeliveryPolicyMap>
                        <redeliveryPolicyEntries>
                            <!-- a destination specific policy -->
                            <redeliveryPolicy queue="SpecialQueue" 
                                              maximumRedeliveries="4" 
                                              redeliveryDelay="10000"/>
                        </redeliveryPolicyEntries>

                        <defaultEntry>
                            <!-- the fallback policy for all other destinations -->
                            <redeliveryPolicy maximumRedeliveries="4" 
                                              initialRedeliveryDelay="5000"
                                              redeliveryDelay="10000"/>
                        </defaultEntry>
                    </redeliveryPolicyMap>
                </redeliveryPolicyMap>
            </redeliveryPlugin>
        </plugins>

</broker> 

必须开启 schedulerSupport 属性。

connection

        //重发策略
        RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
        queuePolicy.setInitialRedeliveryDelay(0);
        queuePolicy.setRedeliveryDelay(1000);
        queuePolicy.setUseExponentialBackOff(false);
        queuePolicy.setMaximumRedeliveries(2);

        //创建队列(有则不创建)
        Destination destination = session.createQueue("garine-queue");

        RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
        map.put((ActiveMQDestination) destination, queuePolicy);

4 死信队列

ActiveMQ中引入了“死信队列”(Dead Letter Queue)的概念,在一条消息被重复发送给消息消费者端多次(默认为6次)后,若一直签收不成功,则ActiveMQ会将这条消息移入到“死信队列”。开发时可以开启一个后台线程监听这个队列(默认死信队列的名称为ActiveMQ.DLQ)中的消息,进行人工干预,也就是说死信队列的作用主要是处理签收失败的消息。

死信队列的配置主要有两种:SharedDeadLetterStrategy和IndividualDeadLetterStrategy

  1. SharedDeadLetterStrategy:共享的死信队列配置策略,将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ Broker端的默认策略。共享队列的名称默认为“ActiveMQ.DLQ”,可以通过“deadLetterQueue”属性来设定:在activemq.xml中的<policyentries>节点中配置</policyentries>
<deadLetterStrategy>
    <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>
  1. IndividualDeadLetterStrategy:单独的死信队列配置策略,把DeadLetter放入各自的死信通道中。对于Queue而言,死信通道的前缀默认为“ActiveMQ.DLQ.Queue”;对于Topic而言,死信通道的前缀默认为“ActiveMQ.DLQ.Topic”。比如队列Order,那么它对应的死信通道为“ActiveMQ.DLQ.Queue.Order”。我们可以使用queuePrefix和topicPrefix来指定上述前缀:
<!-- 仅对与order队列起作用 -->
<policyEntry queue="order">
    <deadLetterStrategy>        <!-- useQueueForQueueMessage属性的作用:是否将名为order的Topic中的DeadLetter也保存在该队列中,默认为true -->
        <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessage="false"/>
    </deadLetterStrategy>
</policyEntry>

注意:

默认情况下,无论是Topic还是Queue,Broker都使用Queue来保存DeadLetter,即死信通道通常为Queue,不过开发时也可以指定为Topic

4.1 自动删除过期消息

自动删除过期消息,此时对于过期的消息将不会被放入到死信队列,而是自动删除,>表示对所有队列起作用,processExpired表示是否将过期消息放入死信队列,默认为true

<!-- >表示对所有队列起作用 -->
<policyEntry queue=">">
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processExpired="false"/>
    </deadLetterStrategy>
</policyEntry>
4.2 存放非持久消息到死队列中

将签收失败的非持久消息也放入到死信队列,默认情况下,ActiveMQ不会把非持久化的死消息放入死信队列,processNonPersistent表示是否将非持久化消息放入死信队列,默认为false

<!-- >表示对所有队列起作用 -->
<policyEntry queue=">">
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processNonPersistent="true"/>
    </deadLetterStrategy>
</policyEntry>

对于过期的,可以通过processExpired属性来控制,对于redelivered的失败的消息,需要通过插件来实现如下:丢弃所有死信 。

<broker>
    <plugins>
        <discardingDLQBrokerPlugin dropAll="true"
                dropTemporaryTopics="true" dropTemporaryQueues="true"/>
    </plugins>
</broker> 

5 防止消息重复消费,幂等性

ActiveMQ中的消息有时是会被重复消费的,而我们消费消息时大都会在拿到消息后去调用其他的方法,比如说将消息的内容解析为一个对象保存到数据库中。一旦发生消息的重复消费时就会重复保存,这是有问题的,因此我们需要考虑如何防止重复调用。其实我们是没有办法防止重复调用的,只能在重复调用时进行消息是否重复消费的校验,当然对于幂等性接口也可以不进行校验。 那如何进行校验呢?有很多种方式,比如说我们将消费过的消息的messageId保存到数据库,每次消费消息前先到数据库中查一下该消息是否已被消费。在分布式系统中,也可以将消费过的消息放入redis中,以messageId作为key,message对象作为value(其实value不重要,当然也要看需求本身),在消费消息时先从redis中查找该消息是否已被消费。

上一篇下一篇

猜你喜欢

热点阅读