Zookeeper

ZK leader 事务请求处理流程

2022-04-08  本文已影响0人  Alen_ab56

Follower的处理链为FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor

同时还有一个SyncRequestProcessor响应leader的proposal,后续我们详细分析

Leader的processor链:PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor。

接下来我们来分析下其处理过程。

public void run() {

        try {

            while (!finished) {

                Request request = queuedRequests.take();

                ...

                // 请求交由下一个processor(commitProcessor)处理   

                nextProcessor.processRequest(request);

                switch (request.type) {

                case OpCode.sync:

                    zks.pendingSyncs.add(request);

                    zks.getFollower().request(request);

                    break;

xx

}

follower将请求转发给leader后,leader收到请求提交Proposal

follower接收到proposal后将操作写到日志,响应ack给leader

我们来看下follower怎么处理proposal的

public class Follower extends Learner{

    void followLeader() throws InterruptedException {

        ...

        try {

            QuorumServer leaderServer = findLeader();           

            try {

                // 创建与leader连接

                connectToLeader(leaderServer.addr, leaderServer.hostname);

                // 将当前节点信息注册到leader上

                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);

                // 与leader进行数据同步

                syncWithLeader(newEpochZxid);               

                QuorumPacket qp = new QuorumPacket();

                while (this.isRunning()) {

                    // 接收leader请求包,并进行处理

                    readPacket(qp);

                    processPacket(qp);

                }

            } ...

    }

protected void processPacket(QuorumPacket qp)throws Exception{

switch (qp.getType()) {

case Leader.PING:

ping(qp);

break;

case Leader.PROPOSAL:

TxnHeader hdr =new TxnHeader();

Record txn =SerializeUtils.deserializeTxn(qp.getData(),hdr);

if (hdr.getZxid() !=lastQueued +1) {

LOG.warn("Got zxid 0x"

                    +Long.toHexString(hdr.getZxid())

+" expected 0x"

                    +Long.toHexString(lastQueued +1));

}

lastQueued =hdr.getZxid();

if (hdr.getType() ==OpCode.reconfig){

SetDataTxn setDataTxn = (SetDataTxn)txn;

QuorumVerifier qv =self.configFromString(new String(setDataTxn.getData()));

self.setLastSeenQuorumVerifier(qv,true);

}

fzk.logRequest(hdr,txn);

break;

FollowerZooKeeperServer.logRequest() 创建事务请求日志

public void logRequest(TxnHeader hdr,Record txn) {

Request request =new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());

if ((request.zxid &0xffffffffL) !=0) {

pendingTxns.add(request);

}

syncProcessor.processRequest(request);

}

public void processRequest(Request request) {

// request.addRQRec(">sync");

    queuedRequests.add(request);

}

SyncProcessor的作用我们都知道,就是将当前请求进行事务日志保存。

而事务日志保存完成后,则直接交由SendAckRequestProcessor来处理

public class SendAckRequestProcessor implements RequestProcessor, Flushable {

    public void processRequest(Request si) {

        if(si.type != OpCode.sync){

            // 直接返回leader ack响应包

            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,

                null);

            try {

                learner.writePacket(qp, false);

            } catch (IOException e) {

              ...

            }

        }

    }

}

FollowerZooKeeperServer.commit() 提交事务proposal

public class FollowerZooKeeperServer extends LearnerZooKeeperServer {

public void commit(long zxid) {

        if (pendingTxns.size() == 0) {

            LOG.warn("Committing " + Long.toHexString(zxid)

                    + " without seeing txn");

            return;

        }

        long firstElementZxid = pendingTxns.element().zxid;

        if (firstElementZxid != zxid) {

            LOG.error("Committing zxid 0x" + Long.toHexString(zxid)

                    + " but next pending txn 0x"

                    + Long.toHexString(firstElementZxid));

            System.exit(12);

        }

        Request request = pendingTxns.remove();

        // 最终交由commitProcessor处理,详见3.1

        commitProcessor.commit(request);

    }

}

总结:关于这块的处理,读者可以对照着Leader处理事务请求的过程来比对着看。

Follower关于事务请求还是分为两部分:

    接收leader proposal请求,记录事务日志后,返回ack响应;

    接收leader commit请求,将请求交由CommitProcessor处理;

CommitProcessor

CommitProcessor.commit() 提交leader事务proposal

public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {

synchronized public void commit(Request request) {

        if (!finished) {

            if (request == null) {

                LOG.warn("Committed a null!",

                        new Exception("committing a null! "));

                return;

            }

            if (LOG.isDebugEnabled()) {

                LOG.debug("Committing request:: " + request);

            }

            // 很简单,直接将请求放入committedRequests

            committedRequests.add(request);

            notifyAll();

        }

    }

}

follower提交事务proposal的方式很简单,就是将请求放入committedRequests集合中,依据我们之前Leader节点对CommitProcessor的分析,在如下

CommitProcessor.run() 处理请求

public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {

    // leader获取的请求集合

    LinkedList<Request> queuedRequests = new LinkedList<Request>();

    // 已经被follower 提交的请求集合

    LinkedList<Request> committedRequests = new LinkedList<Request>();

    public void run() {

        try {

            Request nextPending = null;           

            while (!finished) {

                int len = toProcess.size();

                for (int i = 0; i < len; i++) {

                    // 5.请求proposal已完成,交由下个processor处理即可

                    nextProcessor.processRequest(toProcess.get(i));

                }

                toProcess.clear();

                synchronized (this) {

                    // 2.若没有收到请求且没有收到leader的commit请求,则等待

                    if ((queuedRequests.size() == 0 || nextPending != null)

                            && committedRequests.size() == 0) {

                        wait();

                        continue;

                    }

                    // 3.committedRequests不为空,说明当前follower已经接受到leader的commit请求

                    if ((queuedRequests.size() == 0 || nextPending != null)

                            && committedRequests.size() > 0) {

                        Request r = committedRequests.remove();

                        if (nextPending != null

                                && nextPending.sessionId == r.sessionId

                                && nextPending.cxid == r.cxid) {

                            nextPending.hdr = r.hdr;

                            nextPending.txn = r.txn;

                            nextPending.zxid = r.zxid;

                            // 4.本次请求可以提交给下个processor处理

                            toProcess.add(nextPending);

                            nextPending = null;

                        } else {

                            // this request came from someone else so just

                            // send the commit packet

                            toProcess.add(r);

                        }

                    }

                }

                // We haven't matched the pending requests, so go back to

                // waiting

                if (nextPending != null) {

                    continue;

                }

                // 1.请求达到时,nextPending被设置为当前request,下次循环时会使用到

                synchronized (this) {

                    // Process the next requests in the queuedRequests

                    while (nextPending == null && queuedRequests.size() > 0) {

                        Request request = queuedRequests.remove();

                        switch (request.type) {

                        case OpCode.create:

                        case OpCode.delete:

                        case OpCode.setData:

                        case OpCode.multi:

                        case OpCode.setACL:

                        case OpCode.createSession:

                        case OpCode.closeSession:

                            nextPending = request;

                            break;

                        case OpCode.sync:

                            if (matchSyncs) {

                                nextPending = request;

                            } else {

                                toProcess.add(request);

                            }

                            break;

                        default:

                            toProcess.add(request);

                        }

                    }

                }

            }

        } catch (InterruptedException e) {

            LOG.warn("Interrupted exception while waiting", e);

        } catch (Throwable e) {

            LOG.error("Unexpected exception causing CommitProcessor to exit", e);

        }

        LOG.info("CommitProcessor exited loop!");

    }

}与leader中CommitProcessor的处理类似,读者可以按照上面数字排序来分析整个过程

FinalRequestProcessor

最终都是交由FinalRequestProcessor来处理,这块我们已经分析过很多次了,不再赘述。

总结:

这里通过分析Follower节点处理请求(事务请求)的过程,可以了解到:Follower本身并不处理事务请求,而是直接转发给leader来处理;

但是follower会配合leader进行proposal的处理,最终将节点信息添加到当前ZKDatabase。

上一篇 下一篇

猜你喜欢

热点阅读