Zookeeper(四)-客户端-消息处理流程

2020-12-23  本文已影响0人  进击的蚂蚁zzzliu

概述

本节分析下客户端消息的处理,重点关注Watcher及DataCallback;本节以getData的同步和异步方法为例进行分析;


RPC方法流程.png

处理流程

客户端请求处理流程.png
1.示例代码
public class DemoTest implements Watcher, AsyncCallback.DataCallback {
    private static Stat stat = new Stat();

    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
        DemoTest demoTest = new DemoTest();
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 3000000, demoTest);
        // 同步
        byte[] bytes = zooKeeper.getData("/test1", demoTest, stat);
        // 异步
        zooKeeper.getData("/test1", demoTest, demoTest, "异步回调需要传递的数据");
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("接收到watch通知:" + event);
    }

    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        System.out.println("结果回调:" + path + "----" + ctx);
    }
}

2.queuePacket构造packet入队outgoingQueue

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
        Record response, AsyncCallback cb, String clientPath,
        String serverPath, Object ctx, WatchRegistration watchRegistration){
    Packet packet = null;
    synchronized (outgoingQueue) {
        packet = new Packet(h, r, request, response, watchRegistration);
        // AsyncCallback默认为空
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            // 数据包入队 需要发送的队列
            outgoingQueue.add(packet);
        }
    }
    // selector.wakeup();
    sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}

3.同步处理流程阻塞

public ReplyHeader submitRequest(RequestHeader h, Record request,
        Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    Packet packet = queuePacket(h, r, request, response, null, null, null,
                null, watchRegistration);
    synchronized (packet) {
        // packet响应后置为true,否则 wait
        while (!packet.finished) {
            packet.wait();
        }
    }
    return r;
}

4.异步处理流程结束

5.doTransport

6.readResponse处理响应

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr = new ReplyHeader();
    // 反序列化响应头
    replyHdr.deserialize(bbia, "header");
    
    ......

    Packet packet;
    synchronized (pendingQueue) {
        if (pendingQueue.size() == 0) {
            throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
        }
        packet = pendingQueue.remove();
    }
    /*
     * Since requests are processed in order, we better get a response to the first request!
     * 由于请求是按顺序处理的,因此响应也要按顺序处理
     */
    try {
        // 比对响应xid 跟 原请求xid是否相等,保证顺序性
        if (packet.requestHeader.getXid() != replyHdr.getXid()) {
            packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
            throw new IOException("Xid out of order. Got Xid "
                    + replyHdr.getXid() + " with err " +
                    + replyHdr.getErr() +
                    " expected Xid "
                    + packet.requestHeader.getXid()
                    + " for a packet with details: "
                    + packet );
        }
        // 设置响应头
        packet.replyHeader.setXid(replyHdr.getXid());
        packet.replyHeader.setErr(replyHdr.getErr());
        packet.replyHeader.setZxid(replyHdr.getZxid());
        if (replyHdr.getZxid() > 0) {
            lastZxid = replyHdr.getZxid();
        }
        // 反序列化响应体
        if (packet.response != null && replyHdr.getErr() == 0) {
            packet.response.deserialize(bbia, "response");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet);
        }
    } finally {
        // packet响应反序列化后,处理watch注册
        finishPacket(packet);
    }
}

7.finishPacket

private void finishPacket(Packet p) {
    // submitRequest时的wcb,watcher不为空时p.watchRegistration != null
    if (p.watchRegistration != null) {
        // 注册watcher
        p.watchRegistration.register(p.replyHeader.getErr());
    }

    // 异步回调AsyncCallback为空,需要同步返回
    if (p.cb == null) {
        synchronized (p) {
            // finished置为true,
            p.finished = true;
            // 唤醒submitRequest时线程
            p.notifyAll();
        }
    } else {
        // 异步返回,通过eventThread线程处理
        p.finished = true;
        // 加入阻塞队列waitingEvents (LinkedBlockingQueue)
        eventThread.queuePacket(p);
    }
}
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();

8.processEvent处理事件

private void processEvent(Object event) {
      try {
              Packet p = (Packet) event;
              int rc = 0;
              String clientPath = p.clientPath;
              if (p.replyHeader.getErr() != 0) {
                  rc = p.replyHeader.getErr();
              }
              
              ......
              
              else if (p.response instanceof GetDataResponse) {
                  DataCallback cb = (DataCallback) p.cb;
                  GetDataResponse rsp = (GetDataResponse) p.response;
                  if (rc == 0) {
                      cb.processResult(rc, clientPath, p.ctx, rsp.getData(), rsp.getStat());
                  } else {
                      cb.processResult(rc, clientPath, p.ctx, null, null);
                  }
              } 
          }
      } catch (Throwable t) {
          LOG.error("Caught unexpected throwable", t);
      }
}
上一篇下一篇

猜你喜欢

热点阅读