RocketMQ主从切换

2020-01-09  本文已影响0人  丑人林宗己

本文主要是记录raft协议的学习过程,包括如下几个方面

raft是一种分布式共识算法,具体的一些简述可从百度/维基百科等搜索查阅,我个人觉得想要了解该协议,不妨从一些常见的分布式典型场景来分析。

比如很典型的MySQL主备问题。一般情况下MySQL都是一主多从的架构,而绝大多数人比较少关注主备切换,即当主库发生宕机或者服务奔溃等等问题导致无法正常提供服务,原来备份的数据库会立刻被切出来作为主库。

如果主库挂了,能从一堆从库中自动选举一个出来作为主库继续运行服务,那么是不是就可以去掉备库,或者将备库改为从库,全部对外提供服务?

这里涉及的问题就是

MySQL的主从架构在数据一致性上有一定的乏力,很多需求往往最终都通过指定主库查询来解决数据时效性问题。但是主从也有很大的优势,因为不需要保证主从之间的数据一致性问题。架构有其取舍之道,而技术选型也理应从实际业务场景出发。

关于数据一致性的问题,可以从CAP的角度进行分析。

raft协议在关于解决如上的两个问题方面,给出了一种折中的解决方案,同类的产品包括paxos,zab等等,raft协议的核心在于如下

选举机制

Leader, Follower, Candidate

想要理解整个选举的过程,最好看看该动画,而在选举的过程中涉及几个概念如下

流程大致是:所有节点的初始化状态时Follower,由于没有接收到来自Leader的心跳检查而进入Candidate,随后Candidate会随机生成一个timeout,在150ms 到 300ms,当Candidate的timeout时间到了之后,会向其他节点发起投票,在得到绝大多数票的节点会成为Leader,成为Leader之后就像其他节点发送心跳检测

RocketMQ 选举的实现方案

当节点启动时有几种状况

public enum RESULT {
        UNKNOWN,
        ACCEPT,  /** 表示接受本轮投票*/
        REJECT_UNKNOWN_LEADER,
        REJECT_UNEXPECTED_LEADER,
        REJECT_EXPIRED_VOTE_TERM,
        REJECT_ALREADY_VOTED,
        REJECT_ALREADY__HAS_LEADER, /** 集群已存在LEADER*/
        REJECT_TERM_NOT_READY,
        REJECT_TERM_SMALL_THAN_LEDGER,
        REJECT_EXPIRED_LEDGER_TERM,
        REJECT_SMALL_LEDGER_END_INDEX;
}

要想很好的确定这些值得意义,可阅读handleVote()方法,结果大致翻译为

if(投票的ID不在状态机配置的ID列表中) return REJECT_UNKNOWN_LEADER;
if(不是自投 && 状态机自己的ID == 请求投票的ID) return REJECT_UNEXPECTED_LEADER;
if(请求投票的任期 < 状态机维护的当前任期) return REJECT_EXPIRED_VOTE_TERM;
else if(请求投票的任期 == 状态机维护的当前任期) {
    if (状态机未投票) {
       //let it go
    } else if (状态机的投票跟请求投票ID一致) {
       //repeat just let it go
    } else {
        if (状态机本身已经有了LEADER) {
           return RESULT.REJECT_ALREADY__HAS_LEADER
        } else {
             return RESULT.REJECT_ALREADY_VOTED
        }
     }
}
else {
  将角色修改为候选者,并更新状态机的任期;
  needIncreaseTermImmediately = true;
  return VoteResponse.RESULT.REJECT_TERM_NOT_READY;
}

if(请求的LEADER的任期 < 状态即维护的LEADER的任期) {
 return  VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM
}else if(前者相等的前提下 && 请求LEADER的最终Index < 状态机维护的LEADER的Index) {
  return REJECT_SMALL_LEDGER_END_INDEX;
}
if(请求的任期 < 状态机维护的LEADER的任期) return REJECT_TERM_SMALL_THAN_LEDGER;
状态机投请求LEADER一票;
返回 RESULT.ACCEPT;

要特别注意,处理投票的方法是,是通过synchronized给状态机加锁的,意味着此时只有一个线程可以更新状态机的状态

发起投票后,节点会等待一个时间后进行统计,时间为voteLatch.await(3000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);

public enum ParseResult {
        WAIT_TO_REVOTE,
        REVOTE_IMMEDIATELY,
        PASSED,  /** 成为本次投票的LEADER */
        WAIT_TO_VOTE_NEXT;
}

要想很好的确定这些值得意义,可阅读maintainAsCandidate()方法,结果大致翻译为

if(如果所有已知投票结果组最大的任期 > 状态机当前任期) WAIT_TO_VOTE_NEXT;
else if(已经存在LEADER) {
  WAIT_TO_VOTE_NEXT;
  getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak;
}
else if(响应UNKONW的节点数量超过所有节点的/2 + 1 的数量时) {
  WAIT_TO_REVOTE;
  getNextTimeToRequestVote()
}else if(响应ACCEPT的节点数量超过所有节点的/2 + 1的数量时) {
  PASSED;
}else if((响应ACCEPT的节点数量+还未准备任期的机器数量) 超过所有节点的/2 + 1的数量时) {
 REVOTE_IMMEDIATELY;
}else if((响应ACCEPT的节点数量 + LEADER的Index更前的数量) 超过所有节点的/2 + 1的数量时) {
 WAIT_TO_REVOTE;
 getNextTimeToRequestVote(); // 这里的Index关乎日志的复制
}else {
  WAIT_TO_VOTE_NEXT;
  getNextTimeToRequestVote();
}
if(结果是PASSED) {
  更新状态机为LEADER;
}

要注意区分WAIT_TO_REVOTEWAIT_TO_VOTE_NEXT
如果节点更正为LEADER后呢?下次要做的就是向其他节点发起心跳检查sendHeartbeats

如果集群节点已经存在LEADER则会发起等待下次投票,而下次投票时间是随机时间 + 200 * 3,已经算上节点收到心跳的最长时间,而当节点收到心跳检查时如果节点还未维护LEADER则将节点更正为FOLLOWER

如果节点更正为FOLLOWER,间断检查上次收到心跳的时间是否大于 两次心跳间隔,如果是则锁定状态机,并判断上次心跳时间是否大于 maxHeartBeatLeak * heartBeatTimeIntervalMs(默认相当于三次心跳间隔),如果是将状态机更新为CANDIDATE

当发生角色更正时,会将节点的主从进行变更,代码参考DLedgerRoleChangeHandler#handle()

至此,选举与主从关系正式建立。选举机制保证了有一个正常的机器可以被推选出来成为主节点。成为主节点的一个核心诉求是数据的日志必须是最靠前的,即Index要是集群节点中最新的

状态机日志复制

状态的日志复制流程,从动画中也可以看出来

客户端向主节点发送一条日志,主节点更正状态机的日志,同时向其他从节点发送日志,收到响应后再想客户端响应SUCCESS

随后主节点在向其他从节点发送信息提示对日志进行Commit,收到响应认为Commit成功。

从主从节点数据复制的角度来看,对于客户端而言,从发送数据到收到响应则认为数据已经在各个从节点有了,第二次进行Commit操作,是指对于本集群的日志(前文提到的Index)进行了Commit,与操作系统对应的刷盘等无关系,当然具体实现也可以与之相连,只是从Raft协议角度而言有日志Commit的设定

RocketMQ 状态机日志复制的实现方案

对于客户端而言是一次数据发送,而对于实现了Raft协议的框架而言,则是日志,一条数据对应于一条日志,一条日志的描述语为DLedgerEntry

public class DLedgerEntry {

    public final static int POS_OFFSET = 4 + 4 + 8 + 8; // magix - size - index -term
    public final static int HEADER_SIZE = POS_OFFSET + 8 + 4 + 4 + 4; // - pos - channel - chainCrc -bodyCrc
    public final static int BODY_OFFSET = HEADER_SIZE + 4; // - body.length 

    private int magic;
    private int size;
    private long index;  /** 本条日志对应整个集群的Index*/
    private long term;   /** 本条日志对应整个集群的任期*/
    private long pos; //used to validate data
    private int channel; //reserved
    private int chainCrc; //like the block chain, this crc indicates any modification before this entry.
    private int bodyCrc; //the crc of the body
    private byte[] body; /** 数据 */
}

如果对于DLedgerEntry如上的注释不够清晰,可以查阅DLedgerEntryCoder,一下子就豁然开朗,比如其中的一个编码方法

public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) {
        byteBuffer.clear();
        int size = entry.computSizeInBytes();
        //always put magic on the first position
        byteBuffer.putInt(entry.getMagic());
        byteBuffer.putInt(size);
        byteBuffer.putLong(entry.getIndex());
        byteBuffer.putLong(entry.getTerm());
        byteBuffer.putLong(entry.getPos());
        byteBuffer.putInt(entry.getChannel());
        byteBuffer.putInt(entry.getChainCrc());
        byteBuffer.putInt(entry.getBodyCrc());
        byteBuffer.putInt(entry.getBody().length);
        byteBuffer.put(entry.getBody());
        byteBuffer.flip();
 }

整个日志复制的核心类为DLedgerEntryPusher,查阅其启动方法

public void startup() {
     entryHandler.start(); // 从节点处理主节点日志复制
     quorumAckChecker.start(); // 主节点等待日志ACK(Raft协议复制协议数据复制第一步ACK后才可以响应客户端)
     for (EntryDispatcher dispatcher : dispatcherMap.values()) {
         dispatcher.start(); // 主节点像各个从节点复制日志
     }
}

对于RocketMQ而言,Commilog是一个很核心的概念,而在主从集群的架构下,核心类为DLedgerCommitLog

如果对于之前关于CommitlogConsumerQueue等等不够了解的话,请查阅之前的文章

查阅DLedgerCommitLog类中一个非常核心的方法putMessage,其中有非常核心的代码片段,如下

AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.data);
dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);

这里便是进入了日志复制的入口了。

DLedgerServer#handleAppend方法中可以看到

appendAsLeader

该方法跟主节点的日志写入到文件(内存)息息相关

public DLedgerEntry appendAsLeader(DLedgerEntry entry) {
    // ....
    ByteBuffer dataBuffer = localEntryBuffer.get(); // 线程副本
    ByteBuffer indexBuffer = localIndexBuffer.get(); // 线程副本
    DLedgerEntryCoder.encode(entry, dataBuffer); // 编码
    int entrySize = dataBuffer.remaining(); // 计算日志槽点的字节数
    synchronized (memberState) { // 锁住状态机
        PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null);
        long nextIndex = ledgerEndIndex + 1; // Leader最新Index + 1
        entry.setIndex(nextIndex);
        entry.setTerm(memberState.currTerm());
        entry.setMagic(CURRENT_MAGIC);
        DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), CURRENT_MAGIC);
        long prePos = dataFileList.preAppend(dataBuffer.remaining()); // 预追加,通过先写入魔数将内存页加载到内存中,并计算出追加的文件位置(该方法比较重要)
        entry.setPos(prePos);
        PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);
        DLedgerEntryCoder.setPos(dataBuffer, prePos); // 
        for (AppendHook writeHook : appendHooks) { // 钩子
            writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);
        }
        long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());// 完成追加
        PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);
        PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);
        DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer); // 处理索引文件
        long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);
        PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);
        if (logger.isDebugEnabled()) {
            logger.info("[{}] Append as Leader {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length);
        }
        ledgerEndIndex++; // 主节点的最新Index完成 + 1
        ledgerEndTerm = memberState.currTerm();
        if (ledgerBeginIndex == -1) {
            ledgerBeginIndex = ledgerEndIndex;
        }
        updateLedgerEndIndexAndTerm(); // 更新主节点的日志Index,以及任期
        return entry;
    }
}

waitAck

在这里会看到一个设计,称之为PeerWaterMark,即水位。我个人理解为按照水位上涨的思路来理解日志的追加,对象是peerWaterMarksByTerm

public CompletableFuture<AppendEntryResponse> waitAck(DLedgerEntry entry) {
    updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex()); // 更新主节点水位
    if (memberState.getPeerMap().size() == 1) { // 只有一个节点
        AppendEntryResponse response = new AppendEntryResponse();
        response.setGroup(memberState.getGroup());
        response.setLeaderId(memberState.getSelfId());
        response.setIndex(entry.getIndex());
        response.setTerm(entry.getTerm());
        response.setPos(entry.getPos());
        return AppendFuture.newCompletedFuture(entry.getPos(), response);
    } else {
        checkTermForPendingMap(entry.getTerm(), "waitAck");
        AppendFuture<AppendEntryResponse> future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs()); // 超时时间默认为2500
        future.setPos(entry.getPos());
        // 将异步的响应结果放在了pendingAppendResponsesByTerm中维护
        CompletableFuture<AppendEntryResponse> old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);
        if (old != null) {
            logger.warn("[MONITOR] get old wait at index={}", entry.getIndex());
        }
        wakeUpDispatchers();// 唤醒复制追加的线程
        return future;
    }
}

doAppendInner

向某个节点追加日志的核心方法

private void doAppendInner(long index) throws Exception {
    // ....
    responseFuture.whenComplete((x, ex) -> {
        try {
            PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
            DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
            switch (responseCode) {
                case SUCCESS:
                    pendingMap.remove(x.getIndex()); // 
                    updatePeerWaterMark(x.getTerm(), peerId, x.getIndex()); // 更新对应节点水位
                    quorumAckChecker.wakeup(); // 唤醒日志追加ACK线程
                    break;
                case INCONSISTENT_STATE:
                    logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());
                    changeState(-1, PushEntryRequest.Type.COMPARE);// 需要进行对比,来更正日志
                    break;
                default:
                    logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());
                    break;
            }
        } catch (Throwable t) {
            logger.error("", t);
        }
    });
    lastPushCommitTimeMs = System.currentTimeMillis();
}

QuorumAckChecker#doWork()

该方法略长,想了解的自行查阅,这里提供比较核心的代码片段

Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm); // 获取任期内各个节点的水位

long quorumIndex = -1;
for (Long index : peerWaterMarks.values()) {
    int num = 0;
    for (Long another : peerWaterMarks.values()) {
        if (another >= index) {
            num++;
        }
    }
    if (memberState.isQuorum(num) && index > quorumIndex) { // 满足(n + 2)/1就对该日志进行commit
        quorumIndex = index;
    }
}
dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
// 得出的quorumIndex为最高的可提交的Index

// ....
ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);
boolean needCheck = false;
int ackNum = 0;
if (quorumIndex >= 0) {
    for (Long i = quorumIndex; i >= 0; i--) {
        try {
            CompletableFuture<AppendEntryResponse> future = responses.remove(i);
            if (future == null) {
                needCheck = lastQuorumIndex != -1 && lastQuorumIndex != quorumIndex && i != lastQuorumIndex;
                break;
            } else if (!future.isDone()) { // 如果该异步对象还未响应处理
                AppendEntryResponse response = new AppendEntryResponse();
                response.setGroup(memberState.getGroup());
                response.setTerm(currTerm);
                response.setIndex(i);
                response.setLeaderId(memberState.getSelfId());
                response.setPos(((AppendFuture) future).getPos());
                future.complete(response);// 完成异步对象(此时客户端请求线程被唤醒)
            }
            ackNum++;
        } catch (Throwable t) {
            logger.error("Error in ack to index={} term={}", i, currTerm, t);
        }
    }
}

如上流程大致完成了日志复制的基础工作

流程大致是:


image.png image.png

其他还有日志提交,日志对比,截取等等,后续抽空补上。但是理论上不妨碍理解Raft协议,以及RocketMQ的主从切换设计方案

补充:

3、截取成功后将线程状态更改为append继续追加日志

PS:感觉这部分代码写的是真的……

上一篇下一篇

猜你喜欢

热点阅读