待整理

ActiveMQ的非持久化订阅的思考

2017-08-24  本文已影响80人  MisterCH

项目中用到了大量的client,由于client和AMQ相连,AMQ对大量queue的支持不是很好,所以在项目中用到了发布/订阅+消息选择器的方法来进行针对某个或某几个client的消息分发。

所谓的发布/订阅,指的就是多个client同时订阅一个TOPIC,生产者往这个TOPIC里发送数据的时候,所有的client都能接收到相同的消息。形式很像广播,所以在消息中间件里,这种模式也叫广播模式。

在AMQ遵循的JMS规范中,广播模式有两种形式:持久化订阅和非持久化订阅。

项目里用到的是非持久化订阅,设计时考虑的主要原因有两个:

  1. 持久化订阅极度消耗资源,因为agent的数量比较多,所以一旦为每个消费者保留消息,如果出现消费者再也不上线的情况,消息就会永远保存在服务器上。AMQ的官网上也提到了这种情况:

Durable topic subscribers that are offline for a long period of time are usually not desired in the system. The reason for that is that broker needs to keep all the messages sent to those topics for the said subscribers. And this message piling can over time exhaust broker store limits for example and lead to the overall slowdown of the system.

  1. 非持久化订阅的前期是agent在线,在设计时考虑agent一定要上线才能执行任务,对agent异常的场景考虑不足。

最近项目里几个人都问我AMQ能不能实现为离线agent保留消息,其实是可以的,也是最近才发现的功能,目前想了以下几种方法:

1. 使用持久化订阅

AMQ的官方既然提到了持久化订阅的问题,自然也考虑了一下解决方法。在官方提供的方法里,可通过设置消息的有效期自动清理离线的消费者的方法来降低系统负载。

但是这个方案有两个问题还待测试:

  1. 我们使用的是AMQ集群,持久化订阅是否会在集群内生效还不确定。因为每一次Agent重连时,连接的都可能是不同的MQ,持久化订阅的配置会否通过集群传播至所有的MQ呢?
  2. 平均两千个持久化订阅一台4C8G的AMQ能不能抗住,也需要测试。
  3. 持久化订阅要求Agent至少上线过一次,也就是在MQ上注册过一次,因此如果Agent在离线期间出现了clientID变化,MQ会认其为新的消费者,不会把离线消息发给它。

2. 使用AMQ的Retroactive Consumer功能

翻译是“可追溯”消费者,该功能只对Topic有效,如果consumer是可追溯的,那么它可以获取实例创建之前的消息。通常而言,非持久化订阅不可能获取实例创建之前的消息,因为服务器根本不知道它的存在。对于服务器而言,如果一个Topic通道创建,且有发布者发布消息,那么服务器将会在内存中(非持久化)或者磁盘中(持久化)保存已经发布的消息,直到所有的订阅者都消费者,才会清除原始消息内容。那么retroactive类型的订阅者,就可以获取这些原本不属于自己但broker上还保存的旧消息,就像我们订阅一种Feed,可以立即获取旧的内容列表一样。如果此订阅者不是持久化订阅,它可以获取最近发布的一些消息;如果是持久化订阅,它可以获取存储器中尚未删除的所有的旧消息。

“可回溯”消费者与clientID无关,就算离线时clientID变化也不会影响到消息的回溯。

此外,AMQ支持几种消息回溯的方法:

  1. 保留一定大小的消息
  2. 保留一定条数的消息
  3. 保留一定时间的消息
  4. 只保留一条消息
  5. 只保留消息属性中带上特定字段的消息

这个功能有一个问题,貌似不能保证所有满足条件的消息都被发送至“可追溯”消费者。且在集群模式下也有一定的问题,也是待测试的方案。

5:00pm更新:基本功能验证成功,可以收到离线前的消息,试了保留100条消息和保留1条消息都成功了,生产者和消费者在同一个集群不同机器也可以成功。但集群有个问题,消费者会收到多份同样的消息。消息数量与集群中服务器数量相同。比如3台mq的集群,设置保留10条消息,那新的agent上线就会收到3*10条消息,需要应用自己判断。

如果出现了agent大量掉线重连的场景,对网络带宽应该也有冲击。

3.由应用封装一层任务重新获取机制

在保持原有的代码不变的情况下,考虑在一个缓存(比如redis)设置一个有序集合,每次agent上线时可从该缓存中取出上一条消息或前几条消息并再次处理。

= =对架构冲击比较大,估计不靠谱。

上一篇 下一篇

猜你喜欢

热点阅读