RocketMQJava 程序员linux

深入剖析 RocketMQ 源码 - 负载均衡机制

2022-04-07  本文已影响0人  马小莫QAQ

一、引言

RocketMQ是一款优秀的分布式消息中间件,在各方面的性能都比目前已有的消息队列要好,RocketMQ默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。

RocketMQ主要由 Producer、Broker、Consumer、Namesvr 等组件组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息,Namesvr负责存储元数据,各组件的主要功能如下:

RocketMQ整体消息处理逻辑上以Topic维度进行生产消费、物理上会存储到具体的Broker上的某个MessageQueue当中,正因为一个Topic会存在多个Broker节点上的多个MessageQueue,所以自然而然就产生了消息生产消费的负载均衡需求。

本篇文章分析的核心在于介绍RocketMQ的消息生产者(Producer)和消息消费者(Consumer)在整个消息的生产消费过程中如何实现负载均衡以及其中的实现细节。

二、RocketMQ的整体架构

图片来自于Apache RocketMQ

RocketMQ架构上主要分为四部分,如上图所示:

RocketMQ的Topic的物理分布如上图所示:

Topic作为消息生产和消费的逻辑概念,具体的消息存储分布在不同的Broker当中。

Broker中的Queue是Topic对应消息的物理存储单元。

在RocketMQ的整体设计理念当中,消息的生产消费以Topic维度进行,每个Topic会在RocketMQ的集群中的Broker节点创建对应的MessageQueue。

producer生产消息的过程本质上就是选择Topic在Broker的所有的MessageQueue并按照一定的规则选择其中一个进行消息发送,正常情况的策略是轮询。

consumer消费消息的过程本质上就是一个订阅同一个Topic的consumerGroup下的每个consumer按照一定的规则负责Topic下一部分MessageQueue进行消费。

在RocketMQ整个消息的生命周期内,不管是生产消息还是消费消息都会涉及到负载均衡的概念,消息的生成过程中主要涉及到Broker选择的负载均衡,消息的消费过程主要涉及多consumer和多Broker之间的负责均衡。

三、producer消息生产过程

producer消息生产过程:

3.1 路由同步过程

public class MQClientInstance {

    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }

    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 省略对应的代码
                    } else {
                        // 1、负责查询指定的Topic对应的路由信息
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }

                    if (topicRouteData != null) {
                        // 2、比较路由数据topicRouteData是否发生变更
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        }
                        // 3、解析路由信息转化为生产者的路由信息和消费者的路由信息
                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

                            // 生成生产者对应的Topic信息
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
                            // 保存到本地生产者路由表当中
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
            }
        } catch (InterruptedException e) {
        }

        return false;
    }
}

路由同步过程

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    // 按照broker维度保存的Queue信息
    private List<QueueData> queueDatas;
    // 按照broker维度保存的broker信息
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

public class QueueData implements Comparable<QueueData> {
    // broker的名称
    private String brokerName;
    // 读队列大小
    private int readQueueNums;
    // 写队列大小
    private int writeQueueNums;
    // 读写权限
    private int perm;
    private int topicSynFlag;
}

public class BrokerData implements Comparable<BrokerData> {
    // broker所属集群信息
    private String cluster;
    // broker的名称
    private String brokerName;
    // broker对应的ip地址信息
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    private final Random random = new Random();
}

--------------------------------------------------------------------------------------------------

public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    // 最细粒度的队列信息
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}

public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
    // Topic信息
    private String topic;
    // 所属的brokerName信息
    private String brokerName;
    // Topic下的队列信息Id
    private int queueId;
}

路由解析过程:

public class MQClientInstance {

    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {

        TopicPublishInfo info = new TopicPublishInfo();

        info.setTopicRouteData(route);
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
          // 省略相关代码
        } else {

            List<QueueData> qds = route.getQueueDatas();

            // 按照brokerName进行排序
            Collections.sort(qds);

            // 遍历所有broker生成队列维度信息
            for (QueueData qd : qds) {
                // 具备写能力的QueueData能够用于队列生成
                if (PermName.isWriteable(qd.getPerm())) {
                    // 遍历获得指定brokerData进行异常条件过滤
                    BrokerData brokerData = null;
                    for (BrokerData bd : route.getBrokerDatas()) {
                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
                            brokerData = bd;
                            break;
                        }
                    }
                    if (null == brokerData) {
                        continue;
                    }
                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                        continue;
                    }

                    // 遍历QueueData的写队列的数量大小,生成MessageQueue保存指定TopicPublishInfo
                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                        info.getMessageQueueList().add(mq);
                    }
                }
            }

            info.setOrderTopic(false);
        }

        return info;
    }
}

路由生成过程:

{
    "TBW102": [{
        "brokerName": "broker-a",
        "perm": 7,
        "readQueueNums": 8,
        "topicSynFlag": 0,
        "writeQueueNums": 8
    }, {
        "brokerName": "broker-b",
        "perm": 7,
        "readQueueNums": 8,
        "topicSynFlag": 0,
        "writeQueueNums": 8
    }]
}

路由解析举例:

3.2 负载均衡过程

public class DefaultMQProducerImpl implements MQProducerInner {

    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        // 1、查询消息发送的TopicPublishInfo信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

        if (topicPublishInfo != null && topicPublishInfo.ok()) {

            String[] brokersSent = new String[timesTotal];
            // 根据重试次数进行消息发送
            for (; times < timesTotal; times++) {
                // 记录上次发送失败的brokerName
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 2、从TopicPublishInfo获取发送消息的队列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // 3、执行发送并判断发送结果,如果发送失败根据重试次数选择消息队列进行重新发送
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        switch (communicationMode) {
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (MQBrokerException e) {
                        // 省略相关代码
                    } catch (InterruptedException e) {
                        // 省略相关代码
                    }
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }
        }
    }
}

消息发送过程:

public class TopicPublishInfo {

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            // 按照轮询进行选择发送的MessageQueue
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.getAndIncrement();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                // 避开上一次上一次发送失败的MessageQueue
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
}

路由选择过程:

Producer消息发送示意图

四、consumer消息消费过程

consumer消息消费过程

4.1 路由同步过程

public class MQClientInstance {

    // 1、启动定时任务从namesvr定时同步路由信息
    private void startScheduledTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    }

    public void updateTopicRouteInfoFromNameServer() {
        Set<String> topicList = new HashSet<String>();

        // 遍历所有的consumer订阅的Topic并从namesvr获取路由信息
        {
            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, MQConsumerInner> entry = it.next();
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    Set<SubscriptionData> subList = impl.subscriptions();
                    if (subList != null) {
                        for (SubscriptionData subData : subList) {
                            topicList.add(subData.getTopic());
                        }
                    }
                }
            }
        }

        for (String topic : topicList) {
            this.updateTopicRouteInfoFromNameServer(topic);
        }
    }

    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {

        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 省略代码
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }

                    if (topicRouteData != null) {
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        }

                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

                            // 构建consumer侧的路由信息
                            {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }

                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            }
        } catch (InterruptedException e) {
        }

        return false;
    }
}

路由同步过程

4.2 负载均衡过程

public class RebalanceService extends ServiceThread {

    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));

    private final MQClientInstance mqClientFactory;

    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public void run() {

        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }

    }
}

负载均衡过程

public abstract class RebalanceImpl {

    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                // 省略相关代码
                break;
            }
            case CLUSTERING: { // 集群模式下的负载均衡
                // 1、获取topic下所有的MessageQueue
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

                // 2、获取topic下该consumerGroup下所有的consumer对象
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

                // 3、开始重新分配进行rebalance
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        // 4、通过分配策略重新进行分配
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {

                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
                    // 5、根据分配结果执行真正的rebalance动作
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

重新分配流程

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {

        List<MessageQueue> result = new ArrayList<MessageQueue>();

        // 核心逻辑计算开始

        // 计算当前cid的下标
        int index = cidAll.indexOf(currentCID);

        // 计算多余的模值
        int mod = mqAll.size() % cidAll.size();

        // 计算平均大小
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        // 计算起始下标
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        // 计算范围大小
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        // 组装结果
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
    // 核心逻辑计算结束

    @Override
    public String getName() {
        return "AVG";
    }
}

------------------------------------------------------------------------------------

rocketMq的集群存在3个broker,分别是broker_a、broker_b、broker_c。

rocketMq上存在名为topic_demo的topic,writeQueue写队列数量为3,分布在3个broker。
排序后的mqAll的大小为9,依次为
[broker_a_0  broker_a_1  broker_a_2  broker_b_0  broker_b_1  broker_b_2  broker_c_0  broker_c_1  broker_c_2]

rocketMq存在包含4个consumer的consumer_group,排序后cidAll依次为
[192.168.0.6@15956  192.168.0.7@15957  192.168.0.8@15958  192.168.0.9@15959]

192.168.0.6@15956 的分配MessageQueue结算过程
index:0
mod:9%4=1
averageSize:9 / 4 + 1 = 3
startIndex:0
range:3
messageQueue:[broker_a_0、broker_a_1、broker_a_2]

192.168.0.6@15957 的分配MessageQueue结算过程
index:1
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:3
range:2
messageQueue:[broker_b_0、broker_b_1]

192.168.0.6@15958 的分配MessageQueue结算过程
index:2
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:5
range:2
messageQueue:[broker_b_2、broker_c_0]

192.168.0.6@15959 的分配MessageQueue结算过程
index:3
mod:9%4=1
averageSize:9 / 4 = 2
startIndex:7
range:2
messageQueue:[broker_c_1、broker_c_2]

分配策略分析:

consumer的分配

五、RocketMQ指定机器消费设计思路

日常测试环境当中会存在多台consumer进行消费,但实际开发当中某台consumer新上了功能后希望消息只由该机器进行消费进行逻辑覆盖,这个时候consumerGroup的集群模式就会给我们造成困扰,因为消费负载均衡的原因不确定消息具体由那台consumer进行消费。当然我们可以通过介入consumer的负载均衡机制来实现指定机器消费。

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        // 通过改写这部分逻辑,增加判断是否是指定IP的机器,如果不是直接返回空列表表示该机器不负责消费
        if (!cidAll.contains(currentCID)) {
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}

consumer负载均衡策略改写

六、小结

本文主要介绍了RocketMQ在生产和消费过程中的负载均衡机制,结合源码和实际案例力求给读者一个易于理解的技术普及,希望能对读者有参考和借鉴价值。囿于文章篇幅,有些方面未涉及,也有很多技术细节未详细阐述,如有疑问欢迎继续交流。

作者:vivo互联网技术
链接:https://juejin.cn/post/7083676205022445582
来源:稀土掘金

上一篇下一篇

猜你喜欢

热点阅读