RocketMQ源码分析(六)消息的拉取
几个基本概念先说下
消息的消费有两种模式:集群模式和广播模式
集群模式,一条消息只允许被一个消费者消费。
广播模式,一条消息会发送给所有订阅该topic的用户消费。
RocketMQ的消息消费是基于拉模式,一个任务拉取完后再拉。
RocketMQ支持顺序消费,只要保证是同一个队列即可;但是要实现全局顺序消费,可以将该Topic的queue设为1,但是这样会牺牲一定的高可用性。
下面我们来说说消费者的启动流程,DefaultMQPushConsumerImpl的start方法,他做了以下几件事(不贴源码了,跟生产者挺像的):
- 订阅主题
- 初始化MQClient等类
- 初始化消息进度
- 根据是否是顺序消费,创建消费端消费线程服务
- 想MQClientInstance注册消费者。
关于如何拉取任务,主要是通过PullMessageService去实现,它会从 pullRequestQueue 中获取一个PullRequest 消息拉取任务,如果pullRequestQueue为空,则线程将阻塞,直到有拉取任务被放入 。然后调用 pullMessage 方法进行消息拉取 。
image.png
其中ProcessQueue相当于MessageQueue的快照,一个在消费端的存储容器,PullMessageService会将ProcessQueue提交到线程池去消费,然后把消息从ProcessQueue移除。
消息拉取的具体步骤比较复杂,借用大佬的流程图:
image.png
值得一提的是,其实RocketMQ没有实现真正的推模式,而是消费者主动向服务器拉取消息(没想到吧),它是通过一个轮询机制来保证的,其中轮询又分为短轮询和长轮询。
在Broker中可以通过longPollingEnable来开启长轮询机制。
如果不启用长轮询机制,则会在服务端等待 shortPollingTimeMills 时间后(挂起)再去判断消息是否已到达消息队列,如果消息未到达则提示消息拉取客户端 PULL_NOT_FOUND (消息不存在),如果开启长轮询模式, RocketMQ 一方面会每5s 轮询检查一次消息是否可达 , 同时一有新消息到达后立马通知挂起线程再次验证新消息是否是自己感兴趣的消息。
但其实这样,效率还是不太行,所以RocketMQ还有一个唤醒机制,如果消息到达CommitLog,服务端会唤醒消费者;然后消费者对比和服务端的偏移量,决定是否去拉消息。
关于消息队列重负载,MQ每隔20s会重新负载一次消费者,以保持平衡,所以在下一次重负载之前,可能会出现新加入消费者丢失的问题。