RocketMQ主从切换
本文主要是记录raft协议的学习过程,包括如下几个方面
- raft协议一些基本概念
- raft协议场景
- raft协议在rocketmq上的实践
raft是一种分布式共识算法,具体的一些简述可从百度/维基百科等搜索查阅,我个人觉得想要了解该协议,不妨从一些常见的分布式典型场景来分析。
比如很典型的MySQL主备问题。一般情况下MySQL都是一主多从的架构,而绝大多数人比较少关注主备切换,即当主库发生宕机或者服务奔溃等等问题导致无法正常提供服务,原来备份的数据库会立刻被切出来作为主库。
- 主库与备库数据一致性
- 正常情况下备库无法提供服务,只有主从库会提供服务
如果主库挂了,能从一堆从库中自动选举一个出来作为主库继续运行服务,那么是不是就可以去掉备库,或者将备库改为从库,全部对外提供服务?
这里涉及的问题就是
- 数据一致性
- 选举
MySQL的主从架构在数据一致性上有一定的乏力,很多需求往往最终都通过指定主库查询来解决数据时效性问题。但是主从也有很大的优势,因为不需要保证主从之间的数据一致性问题。架构有其取舍之道,而技术选型也理应从实际业务场景出发。
关于数据一致性的问题,可以从CAP的角度进行分析。
raft协议在关于解决如上的两个问题方面,给出了一种折中的解决方案,同类的产品包括paxos,zab等等,raft协议的核心在于如下
- 选举机制
- 状态机之间的日志复制
选举机制
Leader, Follower, Candidate
- Leader 可以认为是主从关系中的主
- Follower 可以认为是主从关系中的从
- Candidate 可以认为是主从选举过程中的一个协调者
想要理解整个选举的过程,最好看看该动画,而在选举的过程中涉及几个概念如下
- time: 定时器
- term: 任期
- heatbeat: 心跳
- vote: 投票
流程大致是:所有节点的初始化状态时Follower,由于没有接收到来自Leader的心跳检查而进入Candidate,随后Candidate会随机生成一个timeout,在150ms 到 300ms,当Candidate的timeout时间到了之后,会向其他节点发起投票,在得到绝大多数票的节点会成为Leader,成为Leader之后就像其他节点发送心跳检测
RocketMQ 选举的实现方案
- 节点的状态记录在
MemberState
中,默认初始状态为CANDIDATE
- 选举的核心类
DLedgerLeaderElector
,而Candidate
的的整个流程maintainAsCandidate
- 节点标识为类似
"n0-localhost:20911"
,n0
是ID,localhost:20911
则是访问IP+端口 - 判断投票成功过的公式为
num >= ((peerSize() / 2) + 1)
当节点启动时有几种状况
-
节点是一个个启动的,那么第一个节点启动时投票肯定时没有响应,响应即为
VoteResponse.RESULT.UNKNOWN
,第二个节点启动了,但是第一个节点如果成功投自己一票,第二个节点发起投票时就会响应集群已有leader,随后加入,其后的节点如此。 -
节点是瞬间同时启动,都向各方发起投票,那么第一轮投票大概率不会成功,
nextTimeToRequestVote
该值默认值是-1,所以启动就直接进入第一轮投票了,而第二轮投票开始通过getNextTimeToRequestVote()
计算了下次发起投票的时间,计算的方式是System.currentTimeMillis() + lastVoteCost + minVoteIntervalMs + random.nextInt(maxVoteIntervalMs - minVoteIntervalMs)
,由于有了随机数的判定下次进行投票大概率会成功 -
发起投票响应状态
public enum RESULT {
UNKNOWN,
ACCEPT, /** 表示接受本轮投票*/
REJECT_UNKNOWN_LEADER,
REJECT_UNEXPECTED_LEADER,
REJECT_EXPIRED_VOTE_TERM,
REJECT_ALREADY_VOTED,
REJECT_ALREADY__HAS_LEADER, /** 集群已存在LEADER*/
REJECT_TERM_NOT_READY,
REJECT_TERM_SMALL_THAN_LEDGER,
REJECT_EXPIRED_LEDGER_TERM,
REJECT_SMALL_LEDGER_END_INDEX;
}
要想很好的确定这些值得意义,可阅读handleVote()
方法,结果大致翻译为
if(投票的ID不在状态机配置的ID列表中) return REJECT_UNKNOWN_LEADER;
if(不是自投 && 状态机自己的ID == 请求投票的ID) return REJECT_UNEXPECTED_LEADER;
if(请求投票的任期 < 状态机维护的当前任期) return REJECT_EXPIRED_VOTE_TERM;
else if(请求投票的任期 == 状态机维护的当前任期) {
if (状态机未投票) {
//let it go
} else if (状态机的投票跟请求投票ID一致) {
//repeat just let it go
} else {
if (状态机本身已经有了LEADER) {
return RESULT.REJECT_ALREADY__HAS_LEADER
} else {
return RESULT.REJECT_ALREADY_VOTED
}
}
}
else {
将角色修改为候选者,并更新状态机的任期;
needIncreaseTermImmediately = true;
return VoteResponse.RESULT.REJECT_TERM_NOT_READY;
}
if(请求的LEADER的任期 < 状态即维护的LEADER的任期) {
return VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM
}else if(前者相等的前提下 && 请求LEADER的最终Index < 状态机维护的LEADER的Index) {
return REJECT_SMALL_LEDGER_END_INDEX;
}
if(请求的任期 < 状态机维护的LEADER的任期) return REJECT_TERM_SMALL_THAN_LEDGER;
状态机投请求LEADER一票;
返回 RESULT.ACCEPT;
要特别注意,处理投票的方法是,是通过synchronized
给状态机加锁的,意味着此时只有一个线程可以更新状态机的状态
发起投票后,节点会等待一个时间后进行统计,时间为voteLatch.await(3000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);
- 投票的最终结果汇总为
public enum ParseResult {
WAIT_TO_REVOTE,
REVOTE_IMMEDIATELY,
PASSED, /** 成为本次投票的LEADER */
WAIT_TO_VOTE_NEXT;
}
要想很好的确定这些值得意义,可阅读maintainAsCandidate()
方法,结果大致翻译为
if(如果所有已知投票结果组最大的任期 > 状态机当前任期) WAIT_TO_VOTE_NEXT;
else if(已经存在LEADER) {
WAIT_TO_VOTE_NEXT;
getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak;
}
else if(响应UNKONW的节点数量超过所有节点的/2 + 1 的数量时) {
WAIT_TO_REVOTE;
getNextTimeToRequestVote()
}else if(响应ACCEPT的节点数量超过所有节点的/2 + 1的数量时) {
PASSED;
}else if((响应ACCEPT的节点数量+还未准备任期的机器数量) 超过所有节点的/2 + 1的数量时) {
REVOTE_IMMEDIATELY;
}else if((响应ACCEPT的节点数量 + LEADER的Index更前的数量) 超过所有节点的/2 + 1的数量时) {
WAIT_TO_REVOTE;
getNextTimeToRequestVote(); // 这里的Index关乎日志的复制
}else {
WAIT_TO_VOTE_NEXT;
getNextTimeToRequestVote();
}
if(结果是PASSED) {
更新状态机为LEADER;
}
要注意区分WAIT_TO_REVOTE
跟WAIT_TO_VOTE_NEXT
如果节点更正为LEADER
后呢?下次要做的就是向其他节点发起心跳检查sendHeartbeats
如果集群节点已经存在LEADER
则会发起等待下次投票,而下次投票时间是随机时间 + 200 * 3,已经算上节点收到心跳的最长时间,而当节点收到心跳检查时如果节点还未维护LEADER
则将节点更正为FOLLOWER
如果节点更正为FOLLOWER
,间断检查上次收到心跳的时间是否大于 两次心跳间隔,如果是则锁定状态机,并判断上次心跳时间是否大于 maxHeartBeatLeak * heartBeatTimeIntervalMs(默认相当于三次心跳间隔),如果是将状态机更新为CANDIDATE
当发生角色更正时,会将节点的主从进行变更,代码参考DLedgerRoleChangeHandler#handle()
- 角色变更为
CANDIDATE
,如果节点不是从节点,将节点更正为从节点 - 角色变更为
FOLLOWER
,将节点更正为从节点 - 角色变更为
LEADER
,将节点更正为主节点
至此,选举与主从关系正式建立。选举机制保证了有一个正常的机器可以被推选出来成为主节点。成为主节点的一个核心诉求是数据的日志必须是最靠前的,即
Index
要是集群节点中最新的
状态机日志复制
状态的日志复制流程,从动画中也可以看出来
客户端向主节点发送一条日志,主节点更正状态机的日志,同时向其他从节点发送日志,收到响应后再想客户端响应SUCCESS
。
随后主节点在向其他从节点发送信息提示对日志进行Commit
,收到响应认为Commit
成功。
从主从节点数据复制的角度来看,对于客户端而言,从发送数据到收到响应则认为数据已经在各个从节点有了,第二次进行Commit
操作,是指对于本集群的日志(前文提到的Index
)进行了Commit
,与操作系统对应的刷盘等无关系,当然具体实现也可以与之相连,只是从Raft协议
角度而言有日志Commit
的设定
RocketMQ 状态机日志复制的实现方案
对于客户端而言是一次数据发送,而对于实现了Raft协议
的框架而言,则是日志,一条数据对应于一条日志,一条日志的描述语为DLedgerEntry
public class DLedgerEntry {
public final static int POS_OFFSET = 4 + 4 + 8 + 8; // magix - size - index -term
public final static int HEADER_SIZE = POS_OFFSET + 8 + 4 + 4 + 4; // - pos - channel - chainCrc -bodyCrc
public final static int BODY_OFFSET = HEADER_SIZE + 4; // - body.length
private int magic;
private int size;
private long index; /** 本条日志对应整个集群的Index*/
private long term; /** 本条日志对应整个集群的任期*/
private long pos; //used to validate data
private int channel; //reserved
private int chainCrc; //like the block chain, this crc indicates any modification before this entry.
private int bodyCrc; //the crc of the body
private byte[] body; /** 数据 */
}
如果对于DLedgerEntry
如上的注释不够清晰,可以查阅DLedgerEntryCoder
,一下子就豁然开朗,比如其中的一个编码方法
public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) {
byteBuffer.clear();
int size = entry.computSizeInBytes();
//always put magic on the first position
byteBuffer.putInt(entry.getMagic());
byteBuffer.putInt(size);
byteBuffer.putLong(entry.getIndex());
byteBuffer.putLong(entry.getTerm());
byteBuffer.putLong(entry.getPos());
byteBuffer.putInt(entry.getChannel());
byteBuffer.putInt(entry.getChainCrc());
byteBuffer.putInt(entry.getBodyCrc());
byteBuffer.putInt(entry.getBody().length);
byteBuffer.put(entry.getBody());
byteBuffer.flip();
}
整个日志复制的核心类为DLedgerEntryPusher
,查阅其启动方法
public void startup() {
entryHandler.start(); // 从节点处理主节点日志复制
quorumAckChecker.start(); // 主节点等待日志ACK(Raft协议复制协议数据复制第一步ACK后才可以响应客户端)
for (EntryDispatcher dispatcher : dispatcherMap.values()) {
dispatcher.start(); // 主节点像各个从节点复制日志
}
}
对于RocketMQ
而言,Commilog
是一个很核心的概念,而在主从集群的架构下,核心类为DLedgerCommitLog
如果对于之前关于Commitlog
, ConsumerQueue
等等不够了解的话,请查阅之前的文章
查阅DLedgerCommitLog
类中一个非常核心的方法putMessage
,其中有非常核心的代码片段,如下
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.data);
dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
这里便是进入了日志复制的入口了。
在DLedgerServer#handleAppend
方法中可以看到
- 判断
dLedgerEntryPusher.isPendingFull(currTerm)
,该判断的语义是判断当前任期中有是否超过了等待复制追加的最大请求数,maxPendingRequestsNum
,默认值是10000
,如果超过了则响应一个LEADER_PENDING_FULL
,该值的意义应该是在于控制并发。 - 否则调用
dLedgerStore.appendAsLeader
,并返回一个dLedgerEntryPusher.waitAck(resEntry)
appendAsLeader
该方法跟主节点的日志写入到文件(内存)息息相关
public DLedgerEntry appendAsLeader(DLedgerEntry entry) {
// ....
ByteBuffer dataBuffer = localEntryBuffer.get(); // 线程副本
ByteBuffer indexBuffer = localIndexBuffer.get(); // 线程副本
DLedgerEntryCoder.encode(entry, dataBuffer); // 编码
int entrySize = dataBuffer.remaining(); // 计算日志槽点的字节数
synchronized (memberState) { // 锁住状态机
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null);
long nextIndex = ledgerEndIndex + 1; // Leader最新Index + 1
entry.setIndex(nextIndex);
entry.setTerm(memberState.currTerm());
entry.setMagic(CURRENT_MAGIC);
DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), CURRENT_MAGIC);
long prePos = dataFileList.preAppend(dataBuffer.remaining()); // 预追加,通过先写入魔数将内存页加载到内存中,并计算出追加的文件位置(该方法比较重要)
entry.setPos(prePos);
PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);
DLedgerEntryCoder.setPos(dataBuffer, prePos); //
for (AppendHook writeHook : appendHooks) { // 钩子
writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);
}
long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());// 完成追加
PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);
PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);
DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer); // 处理索引文件
long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);
PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);
if (logger.isDebugEnabled()) {
logger.info("[{}] Append as Leader {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length);
}
ledgerEndIndex++; // 主节点的最新Index完成 + 1
ledgerEndTerm = memberState.currTerm();
if (ledgerBeginIndex == -1) {
ledgerBeginIndex = ledgerEndIndex;
}
updateLedgerEndIndexAndTerm(); // 更新主节点的日志Index,以及任期
return entry;
}
}
waitAck
在这里会看到一个设计,称之为PeerWaterMark,即水位。我个人理解为按照水位上涨的思路来理解日志的追加,对象是peerWaterMarksByTerm
public CompletableFuture<AppendEntryResponse> waitAck(DLedgerEntry entry) {
updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex()); // 更新主节点水位
if (memberState.getPeerMap().size() == 1) { // 只有一个节点
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(memberState.getGroup());
response.setLeaderId(memberState.getSelfId());
response.setIndex(entry.getIndex());
response.setTerm(entry.getTerm());
response.setPos(entry.getPos());
return AppendFuture.newCompletedFuture(entry.getPos(), response);
} else {
checkTermForPendingMap(entry.getTerm(), "waitAck");
AppendFuture<AppendEntryResponse> future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs()); // 超时时间默认为2500
future.setPos(entry.getPos());
// 将异步的响应结果放在了pendingAppendResponsesByTerm中维护
CompletableFuture<AppendEntryResponse> old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);
if (old != null) {
logger.warn("[MONITOR] get old wait at index={}", entry.getIndex());
}
wakeUpDispatchers();// 唤醒复制追加的线程
return future;
}
}
doAppendInner
向某个节点追加日志的核心方法
private void doAppendInner(long index) throws Exception {
// ....
responseFuture.whenComplete((x, ex) -> {
try {
PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
switch (responseCode) {
case SUCCESS:
pendingMap.remove(x.getIndex()); //
updatePeerWaterMark(x.getTerm(), peerId, x.getIndex()); // 更新对应节点水位
quorumAckChecker.wakeup(); // 唤醒日志追加ACK线程
break;
case INCONSISTENT_STATE:
logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());
changeState(-1, PushEntryRequest.Type.COMPARE);// 需要进行对比,来更正日志
break;
default:
logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());
break;
}
} catch (Throwable t) {
logger.error("", t);
}
});
lastPushCommitTimeMs = System.currentTimeMillis();
}
QuorumAckChecker#doWork()
该方法略长,想了解的自行查阅,这里提供比较核心的代码片段
Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm); // 获取任期内各个节点的水位
long quorumIndex = -1;
for (Long index : peerWaterMarks.values()) {
int num = 0;
for (Long another : peerWaterMarks.values()) {
if (another >= index) {
num++;
}
}
if (memberState.isQuorum(num) && index > quorumIndex) { // 满足(n + 2)/1就对该日志进行commit
quorumIndex = index;
}
}
dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
// 得出的quorumIndex为最高的可提交的Index
// ....
ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);
boolean needCheck = false;
int ackNum = 0;
if (quorumIndex >= 0) {
for (Long i = quorumIndex; i >= 0; i--) {
try {
CompletableFuture<AppendEntryResponse> future = responses.remove(i);
if (future == null) {
needCheck = lastQuorumIndex != -1 && lastQuorumIndex != quorumIndex && i != lastQuorumIndex;
break;
} else if (!future.isDone()) { // 如果该异步对象还未响应处理
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(memberState.getGroup());
response.setTerm(currTerm);
response.setIndex(i);
response.setLeaderId(memberState.getSelfId());
response.setPos(((AppendFuture) future).getPos());
future.complete(response);// 完成异步对象(此时客户端请求线程被唤醒)
}
ackNum++;
} catch (Throwable t) {
logger.error("Error in ack to index={} term={}", i, currTerm, t);
}
}
}
如上流程大致完成了日志复制的基础工作
流程大致是:
image.png image.png
其他还有日志提交,日志对比,截取等等,后续抽空补上。但是理论上不妨碍理解Raft协议
,以及RocketMQ
的主从切换设计方案
补充:
- 当维护相当多的请求的时候,发生了选举,选举了新的Leader,那么挂着的响应线程将如何处理?
- ACK线程是将起响应为TERM_CHANGED,但是在DLegerCommitLog却没有对应的处理
- 何时进行日志commit?
- 主库进行doAppend(),当writeIndex超过ledgerEndIndex时会进行commit
- 如果从库的日志落后于主库?
1、主库通知从库append时检查 PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE)
2、主库收到响应,将线程状态更改为PushEntryRequest.Type.COMPARE,线程下次发起compare请求,以当前的Index与从库进行对比,(如果本次对比不成功则compareIndex--),继续对比- 从库的校验逻辑:PreConditions.check(request.getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);
- 当获取到从库最后一个与主库一样的日志后开始进行截取(DLedgerEntry.equlse)
3、截取成功后将线程状态更改为append继续追加日志
PS:感觉这部分代码写的是真的……