zk源码阅读49:CommitProcessor源码解析
摘要
事务提交处理器。对于非事务请求,该处理器会直接将其交付给下一级处理器处理;对于事务请求,其会等待集群内针对Proposal的投票直到该Proposal可被提交,利用CommitProcessor,每个服务器都可以很好地控制对事务请求的顺序处理。
属性
private static final Logger LOG = LoggerFactory.getLogger(CommitProcessor.class);
/**
* Requests that we are holding until the commit comes in.
*/
LinkedList<Request> queuedRequests = new LinkedList<Request>();//请求队列
/**
* Requests that have been committed.
*/
LinkedList<Request> committedRequests = new LinkedList<Request>();
RequestProcessor nextProcessor;//下一个处理器
ArrayList<Request> toProcess = new ArrayList<Request>();//待处理的队列
/**
* This flag indicates whether we need to wait for a response to come back from the
* leader or we just let the sync operation flow through like a read. The flag will
* be true if the CommitProcessor is in a Leader pipeline.
*/
boolean matchSyncs;//看sync的请求是等待leader回复,还是说直接处理,像读请求一样。对于leader是false,对于learner是true
volatile boolean finished = false;
说明:
commitProcessor区分事务请求和非事务请求
matchSyncs 在leader端是false,learner端是true,因为learner端sync请求需要等待leader回复,而leader端本身则不需要
函数
构造函数
public CommitProcessor(RequestProcessor nextProcessor, String id,
boolean matchSyncs, ZooKeeperServerListener listener) {
super("CommitProcessor:" + id, listener);
this.nextProcessor = nextProcessor;
this.matchSyncs = matchSyncs;
}
processRequest
处理请求
synchronized public void processRequest(Request request) {
// request.addRQRec(">commit");
if (LOG.isDebugEnabled()) {
LOG.debug("Processing request:: " + request);
}
if (!finished) {
queuedRequests.add(request);//生产到请求队列
notifyAll();
}
}
注意上锁
commit
提交请求请求
synchronized public void commit(Request request) {//事务请求提交
if (!finished) {//只要没有结束
if (request == null) {
LOG.warn("Committed a null!",
new Exception("committing a null! "));
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Committing request:: " + request);
}
committedRequests.add(request);//进入已提交队列
notifyAll();//通知
}
}
shutdown
关闭
public void shutdown() {
LOG.info("Shutting down");
synchronized (this) {
finished = true;
queuedRequests.clear();
notifyAll();
}
if (nextProcessor != null) {
nextProcessor.shutdown();
}
}
run
核心的线程方法,先贴代码再分析
@Override
public void run() {
try {
Request nextPending = null;//下一个未处理的事务请求(不含leader端的sync请求),只要为null,都会while循环从queuedRequests里面找到第一个事务请求,或者直到队列为空
while (!finished) {//只要没有shutdown
int len = toProcess.size();
for (int i = 0; i < len; i++) {
nextProcessor.processRequest(toProcess.get(i));//待处理队列交给下个处理器,按顺序处理
}
toProcess.clear();//队列清空
synchronized (this) {//注意这里上锁,不会出现执行到过程中,queuedRequests的size变了
if ((queuedRequests.size() == 0 || nextPending != null) //这部分结合尾部的while来读,要么 请求队列remove干净,要么从中找到一个事务请求,赋值给nextPending, 不允许size>0且nextPending == null的情况
&& committedRequests.size() == 0) {//且 没有已提交事务
wait();
continue;
}
// First check and see if the commit came in for the pending
// request
if ((queuedRequests.size() == 0 || nextPending != null)// 不允许size>0且nextPending == null的情况
&& committedRequests.size() > 0) {//如果有 已提交的请求
Request r = committedRequests.remove();
/*
* We match with nextPending so that we can move to the
* next request when it is committed. We also want to
* use nextPending because it has the cnxn member set
* properly.
*/
if (nextPending != null
&& nextPending.sessionId == r.sessionId
&& nextPending.cxid == r.cxid) {//如果和nextPending匹配
// we want to send our version of the request.
// the pointer to the connection in the request
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
toProcess.add(nextPending);//加入待处理队列
nextPending = null;//下一个pend的请求清空
} else {
// this request came from someone else so just
// send the commit packet
toProcess.add(r);//这种情况是nextPending还没有来的及设置,nextPending==null的情况(代码应该再细分一下if else),不可能出现nextPending!=null而走到了这里的情况(算异常)
}
}
}
// We haven't matched the pending requests, so go back to
// waiting
if (nextPending != null) {//如果还有 未处理的事务请求(不含leader端的sync请求),就continue
continue;
}
synchronized (this) {//这一段的目的是找到一个 给nextPending赋值
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {//只要queuedRequests队列不空,从中找到第一个 事务请求(不含leader端的sync请求),前面的其他请求全部加入待处理队列
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;//大部分事务请求直接赋给nextPending,然后break
case OpCode.sync:
if (matchSyncs) {//如果需要等leader返回,该值learner端为true
nextPending = request;
} else {
toProcess.add(request);//不需要的话,直接加入待处理队列里
}
break;//leader端matchSyncs是false,learner端才需要等leader回复,这里也break
default:
toProcess.add(request);//非事务请求,都直接加入待处理队列
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
} catch (Throwable e) {
LOG.error("Unexpected exception causing CommitProcessor to exit", e);
}
LOG.info("CommitProcessor exited loop!");
}
注意各种上锁控制并发
里面的代码写的晦涩难懂,是我看过zk代码里面最想吐槽的代码了。现在最新版本的zk这个类已经改的面目全非了。
https://github.com/apache/zookeeper/blob/master/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
代码可以拆成几个部分
1,2部分 3,4部分完全可以按照 1,4,2,3的顺序来读,
1部分:遍历toProcess队列(非事务请求或者已经提交的事务请求),交给下一个处理器处理,清空
4部分:只要不存在pend住的事务请求并且请求队列不为空,一直遍历请求队列直到出现第一个事务请求或者队列遍历完,其间所有非事务请求全部加入toProcess队列,代表可以直接交给下一个处理器处理的
2部分:在请求队列remove干净或者找到了事务请求的情况下,
如果没有提交的请求,就等待。
如果有提交的请求,取出来,看和之前记录的下一个pend的请求是否match。
match的话,进入toProcess队列,nextPending置空
不match的话,(基本上是nextPending为null,不会出现不为null且不匹配的情况),进入toProcess处理
3部分:如果 nextPending非空,就不用再去遍历请求队列,找到下一个事务请求(即4部分),因此continue掉
思考
事务连续性怎么保证的
《paoxs到zk》说这里保证的,对此强烈怀疑。
事务连续性看代码应该是各角色机器单线程处理保证的。(refer中 新版本就多线程了,一写多读)
因为run方法2部分里面的else根本没有检测和nextPending不match的情况
因此个人理解2部分的else中,基本都是nextPending为null,属于还没来的及找nextPending,然后commit方法就被调用了,就直接处理了
完善的写法应该是这里写清楚,至少做一个不为空且不match的检查才好
run方法第2部分if语句的理解
(queuedRequests.size() == 0 || nextPending != null)
这个是针对第4部分while循环的条件,取!
就是说要么队列清空了 要么 找到nextPending
不允许 请求队列不为空 且不存在 nextPending的情况
run方法nextPending的意义
下一个要处理的事务请求
吐槽
run方法
这是我看zk以来最糟心的代码。
顺序上面已经说过了,按1,4,2,3来看
然后if条件,第二部分直接把
(queuedRequests.size() == 0 || nextPending != null)
抽到上层去不行吗,一定要写两遍吗。
然后else根本没有完成检查,让人一开始根本搞不清楚nextPending的意义是什么,
反正匹配不匹配,大家都进入toProcess队列。何必要写nextPending。
看起来像是保证事务顺序的,实际上事务顺序是单线程保证的,和nextPending也没关系。
refer
http://www.jianshu.com/p/68c91b42ccd8
https://github.com/apache/zookeeper/blob/master/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
《paxos到zk》