ActiveMQ之Dispatch Message

2021-01-08  本文已影响0人  爱健身的兔子

1 Message Cursors(消息指针)

在使用非持久消息传递时,ActiveMQ的早期版本中的一个常见问题是RAM缓存区不足。

ActiveMQ 5.0之前的版本在内存中保留了对所有可能分发给活动的“持久主题使用者”或“队列”的消息的引用。尽管引用本身并不大,但确实对可以挂起的最大消息数量施加了限制。

消息系统分发持久消息的一种典型方法是,当客户端准备使用它们时,使用游标保持下一个分配位置,从长期存储中批量提取它们。这是一种健壮且可扩展性强的方法,但是对于消费者(一个或多个)可以跟上消息的产生者(一个或多个)的情况,并不是最有效的方法。

ActiveMQ 5.0采用了一种混合方法,允许消息直接从生产者传递到使用者(在消息保留之后),但是如果消费者慢下来,则切换回使用游标。

当消息的消费者处于活跃状态并且处理能力较强时,被持久化存储的消息直接发送到与消费者相关联的发送队列,如下:

当消息已经出现积压,消费者再开始活跃,或者是消费者的消费速度比消息的发送速度慢时,消息将从Pending Cursor中提取,并发送与消费者关联的发送队列。见下图:

1.1 游标类型

ActiveMQ 5.0中的默认消息光标类型是基于存储的。它的行为如上所述。可以使用两种其他类型的游标:VM游标基于文件的游标。同时支持非持久消息的处理,Store-based内嵌了File-based的模式,非持久消息直接被Non-persistent pending Cursor所处理。工作模式见下图:

1.2 VM游标

VM Cursor是ActiveMQ 4.x的工作方式:对消息的引用保存在内存中,并在需要时传递给调度队列。这可能非常快,但也有一个缺点,就是无法处理非常慢的消费者或长时间不活动的消费者。

1.3 基于文件的游标

基于文件的游标源自VM游标。当代理中的内存达到其限制时,它可以将消息分页到磁盘上的临时文件:

1.4 配置使用

默认情况下,使用基于存储的游标,但是可以根据目标配置不同的游标。

  1. Topic subscribers
<destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic="org.apache.>" producerFlowControl="false" memoryLimit="1mb">
            <dispatchPolicy>
              <strictOrderDispatchPolicy />
            </dispatchPolicy>
            <deadLetterStrategy>
              <individualDeadLetterStrategy  topicPrefix="Test.DLQ." />
            </deadLetterStrategy>
            <pendingSubscriberPolicy>
                <vmCursor />
            </pendingSubscriberPolicy>
            <pendingDurableSubscriberPolicy>
                <vmDurableCursor/>
            </pendingDurableSubscriberPolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
</destinationPolicy>

说明

有效的Subscriber类型是vmCursorfileCursor,缺省是store based cursor。有效的持久化subscribe的cursor types是storeDurableSubscriberCursorvmDurableCursorfileDurableSubscriberCursor,缺省是store based cursor。

  1. Queue
<destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry queue="org.apache.>" >
                    <deadLetterStrategy>
                        <individualDealLetterStrategy  topicPrefix="Test.DLQ"/>
                    </deadLetterStrategy>
                    <pendingQueuePolicy>
                        <vmQueueCursor/>
                    </pendingQueuePolicy>
                </policyEntry>
              </policyEntries>
            </policyMap>
</destinationPolicy>

2 Async Send(异步发送)

ActiveMQ支持异步和同步发送消息,是可以配置,通常对于快的消费者,是直接把消息同步发送过去,但是对于一个慢消费者,你使用同步发送消息可能出现producer堵塞等现象,慢消费者适合异步发送。

配置使用:

  1. ActiveMQ默认设置dispatchAsync=true是最好的性能设置。如果你处理Fast Consumer则使用dispatchAsync=false

  2. 在Connection URI级别来配置使用Async Send

factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616?jms.useAsyncSend=true");
  1. 在ConnectionFactory级别来配置使用Async Send
factory.setUseAsyncSend(true);(此方法是存在于ActiveMQConnectionFactory中的)
  1. Connection级别来配置使用Async Send
connection.setUseAsyncSend(true); (此方法存在于ActiveMQConnection中)

3 Optimized Acknowledgement(消息确认机制)

ActiveMQ缺省支持批量确认消息,由于批量确认会提高性能。若希望禁止使用经过优化的确认方式,有以下几种方式:

  1. 在Connection URI 上禁止启用Optimized Acknowledgements。
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false");
  1. 在ConnectionFactory 上禁止启用Optimized Acknowledgements。
((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase);
  1. 在Connection上禁止启用Optimized Acknowledgements。
((ActiveMQConnection)connection).setOptimizeAcknowledge(true);

4 Dispatch Policies(分发策略)

ActiveMQ的prefetch缺省参数,是针对处理大量消息时的高性能和高吞吐量而设置的,所以缺省的prefetch参数比较大,而且缺省的dispatche policies会尝试尽可能快的填满缓冲。

然而有些情况下,例如只有少量的消息而且单个消息的处理时间比较长,那么在缺省的prefetch和dispatch policies下,这些少量的消息总是倾向于被分发到个边的consumer上,这样就会因为负载的不均衡而导致处理时间的增加。

对于队列,您可以定义是否以round-robin循环方式(默认行为)进行调度,还是在调度过程选择下一个消费者(strictOrderDispatch)之前是否耗尽一个消费者的预取缓冲区。

4.1 Queue配置

queue配置成 strictOrderDispatch 模式。

<policyEntry queue=">" strictOrderDispatch="false" />

注意

从5.14.0版开始-当有一个使用者时,strictOrderDispatch = true(round-robin) 选项将确保重新分发的消息的严格顺序。

4.2 Topic配置

Round Robin dispatch policy会尝试平均分发消息:

<policyEntry topic="FOO.>">
    <dispatchPolicy>
        <roundRobinDispatchPolicy/>
    </dispatchPolicy>
</policyEntry>    

5 Producer Flow Control(生产者流量控制)

流量控制的含义:当生产者产生消息过快,超过流量限制的时候,生产者将会被阻塞知道资源可以继续使用,或者抛出一个JMSException,可以通过来配置。 值得注意的是,默认<systemUsage>设置会在达到或限制时导致生产者阻塞

ActiveMQConnectionFactory connctionFactory = connctionFactory.setProducerWindowSize(1024000);

ProducerWindowSize是producer在发送消息的过程中,收到broker对于之前发送消息的确认之前,能够发送消息的最大字节数。

5.1 禁用 producer flow control

可以通过producerFlowControl在Broker配置中将适当的目标策略上的标志设置为false来对代理上的特定JMS队列和主题禁用流控制。

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry topic="FOO.>" producerFlowControl="false"/>
    </policyEntries>
  </policyMap>
</destinationPolicy>

注意

自从ActiveMQ 5.x中引入新的消息游标之后,非持久化消息被分流到了临时文件存储中,以此来减少非持久化消息传送使用的内存总量。结果就是一个队列的内存限制永远达不到,因为游标不需要使用太多的内存。

如果想把所有的非持久化消息存放到内存中,并在达到内存限制的时候停掉生产者,你需要配置,示例如下:

<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">    
  <pendingQueuePolicy>
    <vmQueueCursor/>
  </pendingQueuePolicy>
</policyEntry>

上面的片段将确保所有非持久队列消息都保留在内存中,每个队列的限制为1Mb。

5.2 配置客户端异常

为了应对broker空间不足,而导致的不确定的阻塞send()方法的一种替代方案,就是将其配置成客户端抛出的一个异常,通过将sendFailIfNoSpace属性设置为true,代理将会引起send()方法失败,并抛出javax.jms.ResourceAllocationException异常,传播到客户端,配置示例:

<systemUsage>
 <systemUsage sendFailIfNoSpace="true">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
   </memoryUsage>
 </systemUsage>
</systemUsage>

此属性的优点是客户端可以捕获javax.jms.ResourceAllocationException,稍等片刻然后重试该send()操作,而不仅仅是无限期地挂起。

从版本5.3.1开始,sendFailIfNoSpaceAfterTimeout已添加该属性。此属性会导致send()操作失败,并在客户端发生异常,但仅在等待给定的时间后才会失败。如果在配置的时间后仍未释放代理上的空间,则send()客户端操作才会失败。下面是一个示例:

<systemUsage>
 <systemUsage sendFailIfNoSpaceAfterTimeout="3000">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
   </memoryUsage>
 </systemUsage>
</systemUsage>

超时以毫秒为单位定义,因此以上示例在send()操作失败之前要等待三秒钟。此属性的优点是,它将在配置的时间量内阻塞,而不是立即失败或无限期阻塞。此属性不仅在代理方面提供了改进,而且还为客户端提供了改进,因此它可以捕获异常,稍等片刻然后重试该send()操作。

ActiveMQ

上一篇下一篇

猜你喜欢

热点阅读