Zookeeper如何网络通信+监听?看一看Watch机制

2023-02-25  本文已影响0人  分布式与微服务

Zookeeper Watch机制

Watcher是一种简单的机制,使客户端得到关于ZooKeeper集合中的更改的通知。 客户端可以在读取特定znode时设置Watcher。Watcher会向注册的客户端发送任何znode(客户端注册表)更改的通知。

1. 概述

ZooKeeper Watch 机制是指,客户端在所有的读命令上告知服务端:这个节点或者子节点变化时通知我,具体来说,支持的写操作有:

例如,我们在命令行可以输入 get -w /foo,其中 -w 参数就是用于告知 ZooKeeper 服务端,当前客户端想在 /foo 节点上设置一个监听器。

ZooKeeper Watch 机制的两个细节:

本篇博客在客户端角度,从底层出发,看一下Zookeeper Watch机制。开始之前,先思考一下以下疑问,带着这些问题进行Zookeeper客户端的学习。

2. 客户端网络IO模型

Copy From ZooKeeper客户端源码解读(网络I/O)

2.1 整体结构图

ClientCnxnSocket 封装了底层Socket通信层, ClientCnxnSocket整体结构如图所示:


2.2 Packet

Packet是ClientCnxn内部定义的一个对协议层的封装,作为ZooKeeper]中请求与响应的载体。



从上图可以看出,Packet中包含了请求头、响应头、请求体、响应体、节点路径和注册的Watcher等信息。

2.3 SenderThread

2.3.1 基本概念

SendThread是客户端ClientCnxn内部一个核心的I/O调度线程,用于管理客户端和服务端之间的所有网络I/O操作。在ZooKeeper客户端的实际运行过程中

Sender进程就一直尝试与Zookeeper服务器进行交互:

//org.apache.zookeeper.ClientCnxn.SendThread

@Override
public void run() {
   // ...
   while (state.isAlive()) {
       clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
   }
   //...
}

 // org.apache.zookeeper.ClientCnxnSocketNIO#doTransport
 void doTransport(...) {
         ...
        //监听Selector,对读和写进行操作
        for (SelectionKey k : selected) {
            ...
            if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                //doIO
                doIO(pendingQueue, cnxn);
            }
          ...
        }

    }

void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
     SocketChannel sock = (SocketChannel) sockKey.channel();
     if (sockKey.isReadable()) {
       // 读操作
     }

      if (sockKey.isWritable()) {
        //写操作
      } 
}


2.3.2 outgoingQueue和pendingQueue

    /**
     * These are the packets that have been sent and are waiting for a response.
     */
    private final Queue<Packet> pendingQueue = new ArrayDeque<>();

    /**
     * These are the packets that need to be sent.
     */
    private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();

2.3.3 发送数据

在正常情况下(即客户端与服务端之间的TCP连接正常且会话有效的情况下):

//org.apache.zookeeper.ClientCnxn#submitRequest
public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        synchronized (packet) {
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
    }

//// org.apache.zookeeper.ClientCnxnSocketNIO#doIO
if (sockKey.isWritable()) {
    // 会从outgoingQueue队列中提取一个可发送的Packet对象
    Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
    ...
    // 发送请求
    sock.write(p.bb);
    if (!p.bb.hasRemaining()) {
         sentCount.getAndIncrement();
          outgoingQueue.removeFirstOccurrence(p);
          if (p.requestHeader != null
              && p.requestHeader.getType() != OpCode.ping
               && p.requestHeader.getType() != OpCode.auth) {
                   synchronized (pendingQueue) {

                     //写入pendingQueue
                      pendingQueue.add(p);
                   }
             }
       }
     ... 
}

2.3.4 响应接收

客户端获取到来自服务端的完整响应数据后,根据不同的客户端请求类型,会进行不同的处理

            // org.apache.zookeeper.ClientCnxnSocketNIO#doIO
            if (sockKey.isReadable()) {
                  int rc = sock.read(incomingBuffer);
                  sendThread.readResponse(incomingBuffer);
                  lenBuffer.clear();
                  incomingBuffer = lenBuffer;
                  updateLastHeard();

             }

            // org.apache.zookeeper.ClientCnxnSocketNIO#readResponse
             void readResponse(ByteBuffer incomingBuffer) throws IOException {
                        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
                        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
                        ReplyHeader replyHdr = new ReplyHeader();
                        replyHdr.deserialize(bbia, "header");
                 switch (replyHdr.getXid()) {
                     ...

                    // -1 means notification(WATCHER_EVENT)
                    // 如果是事务通知
                     case NOTIFICATION_XID:
                            LOG.debug("Got notification session id: 0x{}",
                                Long.toHexString(sessionId));
                            WatcherEvent event = new WatcherEvent();
                            event.deserialize(bbia, "response");
                            ...
                            WatchedEvent we = new WatchedEvent(event);
                            LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
                            //让eventThread触发相应的watcher
                            eventThread.queueEvent(we);
                            return;
                      default:
                            break;
                }

               //如果是常规应答
               Packet packet;
               synchronized (pendingQueue) {
                    if (pendingQueue.size() == 0) {
                          throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
                     }
                     packet = pendingQueue.remove();
                }
               ...
               // 处理Watcher注册等逻辑
               finishPacket(packet);

             }  

2.4 EventThread

EventThread中有一个waitingEvents队列,用于临时存放那么需要被触发的Object,包括那些客户点注册的Watcher和异步接口中注册的回到器AsyncCallBack

同时,EventThread会不断地从waitingEvents这个队列中取出Object,识别出其具体类型(Watcher或者AsynCallBack),并分别调用process和processResult接口方法来实现对事件的触发和回调

           //org.apache.zookeeper.ClientCnxn.EventThread#run   
           public void run() {
                    ...
                  while (true) {
                     Object event = waitingEvents.take();
                     if (event == eventOfDeath) {
                          wasKilled = true;
                      } else {
                           processEvent(event);
                       }

                   }

                        ...

           //org.apache.zookeeper.ClientCnxn.EventThread#processEvent   
           private void processEvent(Object event) {
                     try {
                         if (event instanceof WatcherSetEventPair) {
                            WatcherSetEventPair pair = (WatcherSetEventPair) event;
                                   for (Watcher watcher : pair.watchers) {
                                 try {
                                     watcher.process(pair.event);
                                 } catch (Throwable t) {
                                     LOG.error("Error while calling watcher ", t);
                                 }
                             } 
                         } else {
                             Packet p = (Packet) event;
                             int rc = 0;
                             StatCallback cb = (StatCallback) p.cb;
                            cb.processResult(rc, clientPath, p.ctx,
                                                 ((ExistsResponse) p.response)
                                                         .getStat());
                         }
                     }

              }

3. Zookeeper 客户端Watcher机制原理

ZooKeeper 允许客户端向服务端注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。ZooKeeper的Watcher机制主要包括客户端线程客户端WatchManagerZooKeeper服务器三部分。

  1. 客户端向 ZooKeeper 服务器注册 Watcher

  2. ZooKeeper 注册成功后,会对客户端做出应答。

  3. 客户端将 Watcher 对象存储在客户端的 WatchManager 中

  4. ZooKeeper 服务端触发 Watcher 事件后,向客户端发送通知;

  5. 客户端线程从 WatchManager 中取出对应的 Watcher 对象来执行回调逻辑。

getData接口为例,过一下客户端的注册逻辑:

注册

  1. 当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,发送给服务器。
// org.apache.zookeeper.ZooKeeper#getData
               public byte[] getData(final String path, Watcher watcher, Stat stat){
                    ...
                    WatchRegistration wcb = null;
                    if (watcher != null) {
                    wcb = new DataWatchRegistration(watcher, clientPath);
                    }
                    RequestHeader h = new RequestHeader();
                    request.setWatch(watcher != null);
                    ...
                    GetDataResponse response = new GetDataResponse();
                    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
               }

             //org.apache.zookeeper.ClientCnxn#submitRequest
             public ReplyHeader submitRequest(RequestHeader h, Record request,
                         Record response, WatchRegistration watchRegistration)
                         throws InterruptedException {
                     ReplyHeader r = new ReplyHeader();
                     Packet packet = queuePacket(h, r, request, response, null, null, null,
                                 null, watchRegistration);
                     synchronized (packet) {
                         while (!packet.finished) {
                             packet.wait();
                         }
                     }
                     return r;
                 }

  1. 上一步的“发送”其实就是写入outgoingQueue, 等待SenderThread发送
  2. 调用负责处理服务器响应的 SendThread线程类中的readResponse方法接收服务端的回调,并在最后执行 finishPacket()方法将 Watch 注册到 ZKWatchManager中。
               // org.apache.zookeeper.ClientCnxn#finishPacket
               private void finishPacket(Packet p) {
                    int err = p.replyHeader.getErr();
                    if (p.watchRegistration != null) {
                        p.watchRegistration.register(err);
                    }
                    ...
               }

客户端回调的处理过程

  1. 客户端使用 SendThread.readResponse()方法来统一处理服务端的相应。通过请求头信息判断为事件通知类型,首先将己收到的字节流反序列化转换成 WatcherEvent对象。然后调用 eventThread.queueEvent( )方法将接收到的事件交给 EventThread 线程进行处理。
  2. 按照通知的事件类型,从 ZKWatchManager 中查询注册过的客户端 Watch 信息。客户端在查询到对应的 Watch 信息后,会将其从 ZKWatchManager 的管理中删除。
  3. 将查询到的 Watcher 存储到 waitingEvents队列中,调用 EventThread 类中的 run 方法会循环取出在 waitingEvents队列中等待的 Watcher 事件进行处理。
                 public void queueEvent(WatchedEvent event) {
                            if (event.getType() == EventType.None
                                    && sessionState == event.getState()) {
                                return;
                            }
                            sessionState = event.getState();

                            // materialize the watchers based on the event
                            WatcherSetEventPair pair = new WatcherSetEventPair(
                                    watcher.materialize(event.getState(), event.getType(),
                                            event.getPath()),
                                            event);
                            // queue the pair (watch set & event) for later processing
                            waitingEvents.add(pair);
                        }

4. ZkClient”夺权“EventThread

我们常用的ZkClient其实就是一个Watcher:

               public class ZkClient implements Watcher {

               }

在创建Zookeeper客户端的时候,它将自己当作DefaultWatcher传入,并且之后再设置监听都 watch = false,对所有注册的事件都采用ZkClient来处理。

即ZkClient全面接手waitingEvents的事件处理逻辑,调用自己内部实现的一个Event队列。

上一篇下一篇

猜你喜欢

热点阅读