(7)消息的确认和重发机制

2018-12-16  本文已影响0人  Mrsunup

1.消费端的 PrefetchSize

还记得我们在分析消费端的源码的时候,所讲到的 prefetchsize 吗?
可以参考上一篇的博客:https://www.jianshu.com/p/f5016f845d16

可以参考官网:http://activemq.apache.org/what-is-the-prefetch-limit-for.html

activemq 的 consumer 端也有窗口机制,通过 prefetchSize 就可以设置窗口大小。不同的类型的队列,prefetchSize 的默认值也是不一样的

通过上面的例子,我们基本上应该知道 prefetchSize 的作用了,消费端会根据prefetchSize 的大小批量获取数据,比如默认值是 1000,那么消费端会预先加载 1000 条数据到本地的内存中。

Destination destination=session.createQueue("myQueue?consumer.prefetchSize=10");

既然有批量加载,那么一定有批量确认,这样才算是彻底的优化

ActiveMQ 提供了 optimizeAcknowledge 来优化确认,它表示是否开启“优化ACK”,只有在为 true 的情况下,prefetchSize 以及optimizeAcknowledgeTimeout 参数才会有意义,优化确认一方面可以减轻 client 负担(不需要频繁的确认消息)、减少通信开销,另一方面由于延迟了确认(默认 ack 了 0.65*prefetchSize 个消息才确认),broker 再次发送消息时又可以批量发送
如果只是开启了 prefetchSize,每条消息都去确认的话,broker 在收到确认后也只是发送一条消息,并不是批量发布,当然也可以通过设置 DUPS_OK_ACK来手动延迟确认, 我们需要在 brokerUrl 指定 optimizeACK 选项

ConnectionFactory connectionFactory= new ActiveMQConnectionFactory
("tcp://192.168.11.153:61616?jms.optimizeAcknowledge=true&
jms.optimizeAcknowledgeTimeOut=10000");

注意,如果 optimizeAcknowledge 为 true,那么 prefetchSize 必须大于 0. 当 prefetchSize=0 的时候,表示 consumer 通过 PULL 方式从 broker 获取消息

到目前为止,我们知道了 optimizeAcknowledge 和 prefetchSize 的作用,两者协同工作,通过批量获取消息、并延迟批量确认,来达到一个高效的消息消费模型。它比仅减少了客户端在获取消息时的阻塞次数,还能减少每次获取消息时的网络通信开销

需要注意的是,如果消费端的消费速度比较高,通过这两者组合是能大大提升 consumer 的性能。如果 consumer 的消费性能本身就比较慢,设置比较大的 prefetchSize 反而不能有效的达到提升消费性能的目的。因为过大的prefetchSize 不利于 consumer 端消息的负载均衡。因为通常情况下,我们都会部署多个 consumer 节点来提升消费端的消费性能。这个优化方案还会存在另外一个潜在风险,当消息被消费之后还没有来得及确认时,client 端发生故障,那么这些消息就有可能会被重新发送给其它consumer,那么这种风险就需要 client 端能够容忍“重复”消息

2.消息的确认过程

通过前面的源码分析,基本上已经知道了消息的消费过程,以及消息的批量获取和批量确认,那么接下来再了解下消息的确认过程
消息确认有四种 ACK_MODE,分别是:
AUTO_ACKNOWLEDGE = 1 自动确认
CLIENT_ACKNOWLEDGE = 2 客户端手动确认
DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
SESSION_TRANSACTED = 0 事务提交并确认

虽然 Client 端指定了 ACK 模式,但是在 Client 与 broker 在交换 ACK 指令的时候,还需要告知 ACK_TYPE,ACK_TYPE 表示此确认指令的类型,不同的ACK_TYPE 将传递着消息的状态,broker 可以根据不同的 ACK_TYPE 对消息进行不同的操作

DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未处理结束
STANDARD_ACK_TYPE = 2 "标准"类型,通常表示为消息"处理成功",broker 端可以删除消息了
POSION_ACK_TYPE = 1 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者 DLQ(死信队列)
REDELIVERED_ACK_TYPE = 3 消息需"重发",比如 consumer 处理消息时抛出了异常,broker 稍后会重新发送此消息
INDIVIDUAL_ACK_TYPE = 4 表示只确认"单条消息",无论在任何 ACK_MODE 下
UNMATCHED_ACK_TYPE = 5 在 Topic 中,如果一条消息在转发给“订阅者”时,发现此消息不符合 Selector 过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(相当于在 Broker 上确认了消息)。

Client 端在不同的 ACK 模式时,将意味着在不同的时机发送 ACK 指令,每个 ACK Command 中会包含 ACK_TYPE,那么 broker 端就可以根据 ACK_TYPE 来决定此消息的后续操作

3.消息的重发机制原理

在正常情况下,有几中情况会导致消息重新发送

1.在事务性会话中,没有调用 session.commit 确认消息或者调用session.rollback 方法回滚消息
2.在非事务性会话中,ACK 模式为 CLIENT_ACKNOWLEDGE 的情况下,没有调用 acknowledge 或者调用了 recover 方法;

一个消息被 redelivedred 超过默认的最大重发次数(默认 6 次)时,消费端会给 broker 发送一个”poison ack”(ActiveMQMessageConsumer#dispatch:1460 行),表示这个消息有毒,告诉 broker 不要再发了。这个时候 broker 会把这个消息放到 DLQ(死信队列)。

ActiveMQ 中默认的死信队列是 ActiveMQ.DLQ,如果没有特别的配置,有毒的消息都会被发送到这个队列。默认情况下,如果持久消息过期以后,也会被送到 DLQ 中

死信队列配置策略 :
缺省所有队列的死信消息都被发送到同一个缺省死信队列,不便于管理,可以通过individualDeadLetterStrategy 或 sharedDeadLetterStrategy 策略来进行修改

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
    <constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
// “>”表示对所有队列生效,如果需要设置指定队列,则直接写队列名称
<policyEntry queue=">">
<deadLetterStrategy>
//queuePrefix:设置死信队列前缀
//useQueueForQueueMessage 设置队列保存到死信。
    <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

自动丢弃过期消息 :

<deadLetterStrategy>
    <sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>

死信队列的再次消费:

当定位到消息不能消费的原因后,就可以在解决掉这个问题之后,再次消费死
信队列中的消息。因为死信队列仍然是一个队列

上一篇 下一篇

猜你喜欢

热点阅读