消息队列常见问题
消息队列缺点
- 系统可用性降低:加入消息队列,当消息队列出问题,将会导致系统不可用,系统可用性会降低
- 系统复杂性增加:加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等,系统复杂性增加。
- 一致性问题:多个消费者时,会引发数据一致性的问题。
应用场景分析
异步处理
- 传统模式的缺点:一些非必要的业务逻辑以同步的方式运行,太耗费时间。
-
中间件模式的优点:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快相应速度
异步处理
应用解耦
- 传统模式的缺点:系统间耦合性太强,新的订阅者加入还需要发布者修改代码
-
中间件模式的优点:将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而发布者不需要做任何修改。
应用解耦
流量削锋
- 传统模式的缺点:并发量大的时间,所有的请求直接怼到数据库,造成数据库连接异常
-
中间件模式的优点:系统慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
流量削锋
消息队列模型
P2P/点对点模式(Queue;消息队列,发送者,接收者)
- 每个消息只有一个消费者,消费后消息将不在队列中
- 发送者与接收者没有时间依赖
- 接收者接收消息后需向队列应答
发布订阅模式(Pub/Sub;主题,发布者,订阅者)
- 每个消息可以有多个消费者
- 发布者和订阅者有时间依赖,必须订阅后才能收到消息
- 为了消费消息,订阅者必须保持运行状态
消息队列选型
转自
ActiveMQ早期用的比较多,但是现在貌似用的都不是很多了,网上也没有大规模吞吐量的使用案例分析,社区也貌似不是很活跃了,如果是新项目不建议采用ActiveMQ。
RabbitMQ现在使用的较为多一些,社区活跃度也很高,功能也很强大,官方还提供了管理的web界面,性能也很好,但是RabbitMQ性能好的主要原因是因为使用erlang语言开发的,erlang语言貌似天生性能好,但对于我们java开发者来说,源码基本看不懂,更别提深入的研究了,不过spring推出了rabbit的支持,貌似还比较好用,比自己去封装实现并且去处理一些问题的要好多了。
RocketMQ现在开始用的人也比较多,很多人对于RocketMQ的看法是集成了Kafka和RabbitMQ的有点,是阿里开源的产品,貌似现在是捐赠给了Apache,其源码是java写的,功能十分强大并且是经过阿里大规模应用的,能经过阿里实践使用的一般来说可靠性和可用性都是相当高的,但是也存在一些小问题,现在RocketMQ虽然使用的人好像越来越多了,但是文档资料还是比较少,含金量不怎么高,并且阿里开源的有不维护的风险,就像dubbo中间也用2年没维护,有实力的团队应该没有什么问题,小公司小团队需要考虑一下使用RocketMQ。
Kafka就不多说了,Kafka可以说是业内标准,基本上大数据领域的实时计算、日志、数据处理都是用kafka,开源社区异常活跃,而且像现在阿里云、腾讯云都推出了Kafka的云服务,所以说Kafka就不说了,绝对没问题,放心大胆的用吧。
如何保证消息队列是高可用的
kafka RocketMQ RabbitMQ如何保证消息不被重复消费
正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发出的确认消息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offet的概念,简单说一下,就是每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。
造成重复消费的原因,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。
解决方案:
(1)比如,你拿到这个消息做数据库的insert操作,那就容易了,给这个消息做一个唯一的主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
(2)再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
(3)如果上面两种情况还不行。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis.那消费者开始消费前,先去redis中查询有没有消费记录即可。
如何保证消息的可靠性传输
生产者丢数据
支持事务的队列,如RabbitMQ,可以开始事务,但是会造成吞吐量降低
消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
如何持久化
- 将queue的持久化标识durable设置为true,则代表是一个持久的队列
- 发送消息的时候将deliveryMode=2
消费者丢数据
消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rabbitMQ会立即将消息删除,这种情况下,如果消费者出现异常而未能处理消息,就会丢失该消息。至于解决方案,采用手动确认消息即可。
如何保证消息的顺序性
- rabbitmq:拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理。
- kafka:一个topic,一个partition,一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue即可。
如何解决消息队列的数据积压
- 设立过期时间,直接丢弃数据
- 恢复消费者,临时扩容,快速消费