ZK leader 事务请求处理流程
Follower的处理链为FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor
同时还有一个SyncRequestProcessor响应leader的proposal,后续我们详细分析
Leader的processor链:PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor。
接下来我们来分析下其处理过程。
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
...
// 请求交由下一个processor(commitProcessor)处理
nextProcessor.processRequest(request);
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
xx
}
follower将请求转发给leader后,leader收到请求提交Proposal
follower接收到proposal后将操作写到日志,响应ack给leader
我们来看下follower怎么处理proposal的
public class Follower extends Learner{
void followLeader() throws InterruptedException {
...
try {
QuorumServer leaderServer = findLeader();
try {
// 创建与leader连接
connectToLeader(leaderServer.addr, leaderServer.hostname);
// 将当前节点信息注册到leader上
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
// 与leader进行数据同步
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
// 接收leader请求包,并进行处理
readPacket(qp);
processPacket(qp);
}
} ...
}
protected void processPacket(QuorumPacket qp)throws Exception{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
TxnHeader hdr =new TxnHeader();
Record txn =SerializeUtils.deserializeTxn(qp.getData(),hdr);
if (hdr.getZxid() !=lastQueued +1) {
LOG.warn("Got zxid 0x"
+Long.toHexString(hdr.getZxid())
+" expected 0x"
+Long.toHexString(lastQueued +1));
}
lastQueued =hdr.getZxid();
if (hdr.getType() ==OpCode.reconfig){
SetDataTxn setDataTxn = (SetDataTxn)txn;
QuorumVerifier qv =self.configFromString(new String(setDataTxn.getData()));
self.setLastSeenQuorumVerifier(qv,true);
}
fzk.logRequest(hdr,txn);
break;
FollowerZooKeeperServer.logRequest() 创建事务请求日志
public void logRequest(TxnHeader hdr,Record txn) {
Request request =new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
if ((request.zxid &0xffffffffL) !=0) {
pendingTxns.add(request);
}
syncProcessor.processRequest(request);
}
public void processRequest(Request request) {
// request.addRQRec(">sync");
queuedRequests.add(request);
}
SyncProcessor的作用我们都知道,就是将当前请求进行事务日志保存。
而事务日志保存完成后,则直接交由SendAckRequestProcessor来处理
public class SendAckRequestProcessor implements RequestProcessor, Flushable {
public void processRequest(Request si) {
if(si.type != OpCode.sync){
// 直接返回leader ack响应包
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
null);
try {
learner.writePacket(qp, false);
} catch (IOException e) {
...
}
}
}
}
FollowerZooKeeperServer.commit() 提交事务proposal
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
public void commit(long zxid) {
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid)
+ " without seeing txn");
return;
}
long firstElementZxid = pendingTxns.element().zxid;
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
Request request = pendingTxns.remove();
// 最终交由commitProcessor处理,详见3.1
commitProcessor.commit(request);
}
}
总结:关于这块的处理,读者可以对照着Leader处理事务请求的过程来比对着看。
Follower关于事务请求还是分为两部分:
接收leader proposal请求,记录事务日志后,返回ack响应;
接收leader commit请求,将请求交由CommitProcessor处理;
CommitProcessor
CommitProcessor.commit() 提交leader事务proposal
public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
synchronized public void commit(Request request) {
if (!finished) {
if (request == null) {
LOG.warn("Committed a null!",
new Exception("committing a null! "));
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Committing request:: " + request);
}
// 很简单,直接将请求放入committedRequests
committedRequests.add(request);
notifyAll();
}
}
}
follower提交事务proposal的方式很简单,就是将请求放入committedRequests集合中,依据我们之前Leader节点对CommitProcessor的分析,在如下
CommitProcessor.run() 处理请求
public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
// leader获取的请求集合
LinkedList<Request> queuedRequests = new LinkedList<Request>();
// 已经被follower 提交的请求集合
LinkedList<Request> committedRequests = new LinkedList<Request>();
public void run() {
try {
Request nextPending = null;
while (!finished) {
int len = toProcess.size();
for (int i = 0; i < len; i++) {
// 5.请求proposal已完成,交由下个processor处理即可
nextProcessor.processRequest(toProcess.get(i));
}
toProcess.clear();
synchronized (this) {
// 2.若没有收到请求且没有收到leader的commit请求,则等待
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() == 0) {
wait();
continue;
}
// 3.committedRequests不为空,说明当前follower已经接受到leader的commit请求
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() > 0) {
Request r = committedRequests.remove();
if (nextPending != null
&& nextPending.sessionId == r.sessionId
&& nextPending.cxid == r.cxid) {
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
// 4.本次请求可以提交给下个processor处理
toProcess.add(nextPending);
nextPending = null;
} else {
// this request came from someone else so just
// send the commit packet
toProcess.add(r);
}
}
}
// We haven't matched the pending requests, so go back to
// waiting
if (nextPending != null) {
continue;
}
// 1.请求达到时,nextPending被设置为当前request,下次循环时会使用到
synchronized (this) {
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;
case OpCode.sync:
if (matchSyncs) {
nextPending = request;
} else {
toProcess.add(request);
}
break;
default:
toProcess.add(request);
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
} catch (Throwable e) {
LOG.error("Unexpected exception causing CommitProcessor to exit", e);
}
LOG.info("CommitProcessor exited loop!");
}
}与leader中CommitProcessor的处理类似,读者可以按照上面数字排序来分析整个过程
FinalRequestProcessor
最终都是交由FinalRequestProcessor来处理,这块我们已经分析过很多次了,不再赘述。
总结:
这里通过分析Follower节点处理请求(事务请求)的过程,可以了解到:Follower本身并不处理事务请求,而是直接转发给leader来处理;
但是follower会配合leader进行proposal的处理,最终将节点信息添加到当前ZKDatabase。