RocketMQ源码之HA(高可用)
RocketMQ的HA主要体现在服务器端的Namesrv和broker的配置上,既然是进行了HA,那么肯定是集群来保证的,我们一一来看。
Namesrv高可用
Namesrv集群启动多个 Namesrv实例实现高可用,各个Namesrv节点之间的关系比较特殊:
各个Namesrv节点之间没有任何关联关系,不进行通信和数据交换,仅仅作为负载节点而存在,当有节点挂掉时,其它节点不会受影响,而是继续提供服务,除非所有机器都挂掉,这时Namesrv集群才会瘫痪。
1.broker注册namesrv的代码
public RegisterBrokerResult registerBrokerAll( 一堆入参) {
RegisterBrokerResult registerBrokerResult = null;
//步骤1
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) {
try {
//步骤2
RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
if (result != null) {
registerBrokerResult = result;
}
log.info("register broker to name server {} OK", namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
}
}
}
return registerBrokerResult;
}
1.获取namesrv全部节点列表nameServerAddressList
2.循环namesrv,该broker把自己注册到每个namesrv上,由此来保证所有的namesrv节点的数据是一致的。
2.Producer访问 Namesrv
private Channel getAndCreateNameserverChannel() throws InterruptedException {
···
···
final List<String> addrList = this.namesrvAddrList.get();
if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);
this.namesrvAddrChoosed.set(newAddr);
log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null)
return channelNew;
}
}
} catch (Exception e) {
log.error("getAndCreateNameserverChannel: create name server channel exception", e);
} finally {
this.lockNamesrvChannel.unlock();
}
} else {
log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
return null;
}
生产者、消费者都是从namesrv列表中选择可用的节点进行连接。
Broker高可用
Broker的一个主从组:Master节点x1 + Slave节点xN,
Master节点提供读写服务,Slave节点只提供读服务。
每个主从组,Master节点 不断发送新的 CommitLog 给 Slave节点。 Slave节点 不断上报本地的 CommitLog 已经同步到的位置给 Master节点。
集群内,Master节点 有两种类型:同步和异步:前者在 Producer 发送消息时,等待 Slave节点 存储完毕后再返回发送结果,而后者不需要等待。
1.主节点逻辑代码
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
//步骤1
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
//步骤2
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
···
···
···
}
1.this.processReadEvent()来计算slave请求同步的CommitLog的位置
2.判断是否连接超时
2.WriteSocketService向 Slave 传输新的 CommitLog数据
3.Slave 循环
实现从 Master 传输 CommitLog 数据,上传 Master 自己本地的 CommitLog 已经同步物理位置。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//步骤1
if (this.connectMaster()) {
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
//步骤2
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
//步骤3
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
1.判断上报master的时间间隔并返回result,起到了心跳作用
2.同master的该方法,计算CommitLog偏移量
3.Master过久未返回数据,关闭连接
4.Producer 发送消息
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//判断是否开启了开关
if (this.sendLatencyFaultEnable) {
try {
//获取一个可用的并且brokerName=lastBrokerName的消息队列
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
//选择一个相对好的broker,不考虑可用性的消息队列
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
//随机选择一个消息队列
return tpInfo.selectOneMessageQueue();
}
//获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
我们来看选择broker的逻辑:
1.首先选择一个broker==lastBrokerName并且可用的一个队列(也就是该队列并没有因为延迟过长而被加进了延迟容错对象latencyFaultTolerance 中)
2.如果第一步中没有找到合适的队列,此时舍弃broker==lastBrokerName这个条件,选择一个相对较好的broker来发送
3.随机选择一个队列来发送
总结
1.RocketMQ通过启动多个 【Broker主从组】 形成 集群 实现Broker的高可用。
2.Broker主从组 与 Broker主从组 之间没有任何关系,不进行通信与数据同步。
3.Namesrv各节点之间类似于Broker主从组之间的关系,相互独立,共同负载,不进行通信与数据同步。