我爱编程程序员

Rocketmq实现简析

2018-11-28  本文已影响18人  msrpp

本来想将broker和client分开写。但是他们的每个功能都是共同协作完成的,写broker的时候,难免会涉及到consumer和producer的细节,于是以大杂烩的方式粗略总结了rocketmq的主要功能,主要是broker。

一、nameSvr

RocketMq是Metaq的3.0版本,摒弃了之前使用的zk,用以新的命名服务namesrv,namesrv的代码量比较少,我们从这里开始我们的源码分析路程。

可以看到Main-ClassNamesrvStartup除了读取配置,初始化日志以外,主要功能是由NamesrvController完成的。我们进一步查看NamesrvController类,发现其是有以下几个模块构成:RemotingServer,KVConfigManager,RouteInfoManager

1.2 namesrv的内部组件

1.2.1 KVConfigManager

KVConfigManager为broker提供namespace,K,V 的双层kv存储/读取功能。当发生数据变更时,会实时刷盘。内部用读写锁来控制并发操作。

1.2.2 NettyRemotingServer

提供对外消息接收处理服务。NettyRemotingServer继承自NettyRemotingAbstract,后者对netty进行了封装,抽象出server和client的公共部分。server的实现即为NettyRemotingServernamesrv通过其registerDefaultProcessor方法注册了消息处理对象DefaultRequestProcessor。处理如下消息:

具体实现根据功能转由KVConfigManagerRouteInfoManager模块处理。

1.2.3 RouteInfoManager

路由信息管理,namesrv的核心模块。维护以下列表的增删改查

    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

1.3 定时任务scheduledExecutorService

处理两个事情:

二、broker的组件

三、一条producer发来的消息处理流程

2.1 正常流程

2.2 处理失败的消息

consumer对于消费失败的消息,可以在MessageListenerConcurrently#consumeMessage处理的时候抛出异常或者返回RECONSUME_LATER来标示消息处理失败。对于失败的消息将会把原始消息的offset发回给broker。
broker在commitlog中找到原始的消息内容,取出来并投递到新的retry topic(名称%RETRY%+consumerGroupName)中。这里有两个关键点:根据延迟level指数退避先投递到延迟队列,如果最后retry topic依然消费失败。那么进入死信队列(名称%DLQ%+consumerGroupName)。

四、producer和consumer的工作流程

在了解消费流程之前,我们先看一下client(producer和consumer)的工作流程。

producer和consumer的工作流程大致是一致的,但是也有差异部分。详见MQClientInstance#start

3.1 公共部分

3.2 consumer独有

其实producer也有启动相应线程,但是没有触发条件,无法执行逻辑。

3.2.1. RebalanceService服务,每10s执行一次,对于每个消费组的每个topic,从broker获取到consumer同胞,然后根据负载算法均摊所有的队列。broker可以控制每个topic队列的多少来完成带权重的消息负载,producer可以通过指定发送的队列来实现权重生产。consumer如果要实现类似功能,可以调用setAllocateMessageQueueStrategy修改rocketmq的负载策略。

3.2.2. PullMessageService服务,负责发起拉取消息的任务。RebalanceService服务调用抽象方法RebalanceImpl#dispatchPullRequest将新增的broker队列分发出去,其中pushconsumer的实现RebalancePushImpl会调用PullMessageService的接口向目标broker发起拉取消息的请求。concumer从namesrv中获取同组同topic的消费者,每个消费者分配不重复的队列,所以具体使用的时候,消费者的数量应该要大于队列的数量是没有意义的。具体实现是rebalanceByTopic#rebalanceByTopic

在上文<<一条producer发来的消息处理流程>>也有说过,消息有可能会消费失败,消费失败的消息最后都进了%RETRY%consumerGroupName队列,因此消费者在消费的时候,除了订阅自己负责的topic,还需要订阅本消费组的retry队列。

五、一条push方式consumer发来的pull请求处理流程

consumer的拉取模式

对于push模式来说,rocketmq采用的却是pull的方式来获取消息。pull的间隔似乎决定了broker把消息推给consumer的延时,间隔太长,消息实时性无法保证,时间太短,徒增cpu和网络资源。但是rocketmq给出了一个比较好的解决方案。consumer对于分配到自己身上的每个broker的每个队列,在pull请求的时候给出一个挂起时间pollingTimeMills(第一次是由RebalanceService触发的,pollingTimeMills默认15s),如果对某个队列查询的结果是没有新消息,那么挂起pollingTimeMills时间,期间如果有新消息到来,调用brokerPullRequestHoldService#notifyMessageArriving来重新触发一次消息拉取返回给consumer,如果超时了也返回给consumer。consumer在接收到回复以后立即发起下一条查询。

producer发送同步消息如何实现?

每条消息有一个唯一标识opaque,发送一条消息前,创建一个ResponseFuture,ResponseFuture内部维护了一个计数为1的CountDownLatch对象,保存到上下文列表responseTable(类型:ConcurrentHashMap<Integer /* opaque */, ResponseFuture>)中, 在当前进程接收到消息后,先判断是请求还是回复,如果是回复则清除掉responseTable中的记录,并减少CountDownLatch的计数。用户在发送接口中等到CountDownLatch的结果就可以返回了。

顺序消息

顺序消息由以下两点来保证。

1.从producer到broker的顺序性,producer对于同一类顺序消息,选择同一个broker的同一个queue(调用SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) 接口来定制消息传往那个queue),tcp传输是有序的且broker中commitlog和consumer各队列消息的offset均是顺序读写。因此这点得到了保证。

2.消息从broker到consumer的有序性,同一个队列有且对应了一个consumer且从broker发送到consumer的时候是顺序的。此外consumer注册监听器为MessageListenerOrderly类型,client内部判断如果是这种类型,在执行投递到线程池的ConsumeRequest任务时,需要获取到对应queue队列的锁才能继续调用到用户代码,保证了第二点的顺序性。

六、存储模块补充

5.1 abort

broker在启动的时候会创建一个空的名为abort的文件,并在shutdown时将其删除,用于标识进程是否正常退出,如果不正常退出,会在启动时做故障恢复(todo:分析具体逻辑)

5.2 commitlog和consumequeue

发给同一个broker的所有topic消息均顺序写在commitlog当中(包括消费失败的消息)。每条消息的大小不定,因为commitlog本身是无序且不定长的。所以需要有一种文件来记录每个topic每条消息存储的物理offset,即consumequeue。每个consumequeue文件顺序记录了某个broker中的某个queue的commitlog offset。但是要做到以groupName来分组消费,我们还需要以每种groupName创建一类可以存储每个group消费进度的文件,即config/consumerOffset.json。

5.2.1 刷盘逻辑
5.3 index文件

index是rocketmq的索引文件,如果producer要让一条消息支持索引查询,在发送前需要指定message的key字段。producer或者consumer可以根据方法queryMessage(协议号12)查询所有broker中key是该值的消息记录。消息塞入实现在IndexFile#putKey,消息获取实现在IndexFile#selectPhyOffset

每个index文件由 header(40byte),slot table(4byte500w,每个索引消息的位置: hash(topic+key)%500w),index list(20byte200w,存储消息在commitlog的位置信息) 三个部分组成。hash冲突如何解决?因为要写入文件,开链法肯定行不通。rocketmq采取的方式是indexList部分顺序写,同时每个index记录存储了前一个相同hash的index的位置。而最尾部的index节点位置存储在slot table中。

index文件有如下几个缺点(自己总结的,可能有谬误)

index文件中没有存储topic+key的值,因此对给定一个key,查询出来的结果可能包含无效值(其他hash值一样的key),需要client二次过滤,因此client需要尽量确保key是唯一的。

client在查询时,给定key,maxNum,如果实际获取的list比较大,会查询不全。但是rocketmq没有提供分页的机制。

七、待完善的部分

1.事务

2.Filtersrv服务

3.HA

上一篇 下一篇

猜你喜欢

热点阅读