RocketMQ源码之HA(高可用)

2018-05-09  本文已影响0人  激情的狼王

RocketMQ的HA主要体现在服务器端的Namesrv和broker的配置上,既然是进行了HA,那么肯定是集群来保证的,我们一一来看。

Namesrv高可用

Namesrv集群启动多个 Namesrv实例实现高可用,各个Namesrv节点之间的关系比较特殊:

各个Namesrv节点之间没有任何关联关系,不进行通信和数据交换,仅仅作为负载节点而存在,当有节点挂掉时,其它节点不会受影响,而是继续提供服务,除非所有机器都挂掉,这时Namesrv集群才会瘫痪。

1.broker注册namesrv的代码

public RegisterBrokerResult registerBrokerAll( 一堆入参) {
        RegisterBrokerResult registerBrokerResult = null;
//步骤1
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null) {
            for (String namesrvAddr : nameServerAddressList) {
                try {
//步骤2
                    RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
                        haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
                    if (result != null) {
                        registerBrokerResult = result;
                    }

                    log.info("register broker to name server {} OK", namesrvAddr);
                } catch (Exception e) {
                    log.warn("registerBroker Exception, {}", namesrvAddr, e);
                }
            }
        }

        return registerBrokerResult;
}

1.获取namesrv全部节点列表nameServerAddressList
2.循环namesrv,该broker把自己注册到每个namesrv上,由此来保证所有的namesrv节点的数据是一致的。

2.Producer访问 Namesrv

private Channel getAndCreateNameserverChannel() throws InterruptedException {
        ···
        ···
        final List<String> addrList = this.namesrvAddrList.get();
        if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                addr = this.namesrvAddrChoosed.get();
                if (addr != null) {
                    ChannelWrapper cw = this.channelTables.get(addr);
                    if (cw != null && cw.isOK()) {
                        return cw.getChannel();
                    }
                }

                if (addrList != null && !addrList.isEmpty()) {
                    for (int i = 0; i < addrList.size(); i++) {
                        int index = this.namesrvIndex.incrementAndGet();
                        index = Math.abs(index);
                        index = index % addrList.size();
                        String newAddr = addrList.get(index);

                        this.namesrvAddrChoosed.set(newAddr);
                        log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                        Channel channelNew = this.createChannel(newAddr);
                        if (channelNew != null)
                            return channelNew;
                    }
                }
            } catch (Exception e) {
                log.error("getAndCreateNameserverChannel: create name server channel exception", e);
            } finally {
                this.lockNamesrvChannel.unlock();
            }
        } else {
            log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
        }

        return null;
    }

生产者、消费者都是从namesrv列表中选择可用的节点进行连接。

Broker高可用

Broker的一个主从组:Master节点x1 + Slave节点xN,
Master节点提供读写服务,Slave节点只提供读服务。

每个主从组,Master节点 不断发送新的 CommitLog 给 Slave节点。 Slave节点 不断上报本地的 CommitLog 已经同步到的位置给 Master节点。

集群内,Master节点 有两种类型:同步和异步:前者在 Producer 发送消息时,等待 Slave节点 存储完毕后再返回发送结果,而后者不需要等待。

1.主节点逻辑代码

public void run() {
            HAConnection.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
//步骤1
                    boolean ok = this.processReadEvent();
                    if (!ok) {
                        HAConnection.log.error("processReadEvent error");
                        break;
                    }
//步骤2
                    long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
                    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                        log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                        break;
                    }
                } catch (Exception e) {
                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                    break;
                }
            }
          ···
          ···
          ···
        }

1.this.processReadEvent()来计算slave请求同步的CommitLog的位置
2.判断是否连接超时

2.WriteSocketService向 Slave 传输新的 CommitLog数据

01.png

3.Slave 循环
实现从 Master 传输 CommitLog 数据,上传 Master 自己本地的 CommitLog 已经同步物理位置。

public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
//步骤1
                    if (this.connectMaster()) {

                        if (this.isTimeToReportOffset()) {
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            if (!result) {
                                this.closeMaster();
                            }
                        }

                        this.selector.select(1000);
//步骤2
                        boolean ok = this.processReadEvent();
                        if (!ok) {
                            this.closeMaster();
                        }

                        if (!reportSlaveMaxOffsetPlus()) {
                            continue;
                        }
//步骤3
                        long interval =
                            HAService.this.getDefaultMessageStore().getSystemClock().now()
                                - this.lastWriteTimestamp;
                        if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaHousekeepingInterval()) {
                            log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                                + "] expired, " + interval);
                            this.closeMaster();
                            log.warn("HAClient, master not response some time, so close connection");
                        }
                    } else {
                        this.waitForRunning(1000 * 5);
                    }
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                    this.waitForRunning(1000 * 5);
                }
            }

            log.info(this.getServiceName() + " service end");
        }

1.判断上报master的时间间隔并返回result,起到了心跳作用
2.同master的该方法,计算CommitLog偏移量
3.Master过久未返回数据,关闭连接

4.Producer 发送消息

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//判断是否开启了开关
        if (this.sendLatencyFaultEnable) {
            try {
//获取一个可用的并且brokerName=lastBrokerName的消息队列
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
//选择一个相对好的broker,不考虑可用性的消息队列
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
//随机选择一个消息队列
            return tpInfo.selectOneMessageQueue();
        }
//获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
        return tpInfo.selectOneMessageQueue(lastBrokerName);
}

我们来看选择broker的逻辑:

1.首先选择一个broker==lastBrokerName并且可用的一个队列(也就是该队列并没有因为延迟过长而被加进了延迟容错对象latencyFaultTolerance 中)
2.如果第一步中没有找到合适的队列,此时舍弃broker==lastBrokerName这个条件,选择一个相对较好的broker来发送
3.随机选择一个队列来发送

总结

1.RocketMQ通过启动多个 【Broker主从组】 形成 集群 实现Broker的高可用。
2.Broker主从组 与 Broker主从组 之间没有任何关系,不进行通信与数据同步。
3.Namesrv各节点之间类似于Broker主从组之间的关系,相互独立,共同负载,不进行通信与数据同步。

上一篇下一篇

猜你喜欢

热点阅读