zookeeper源码分析(2)-客户端启动流程
zookeeper原生客户端启动流程
客户端整体结构如下:
核心组件
ZooKeeper
客户端的入口,负责启动整个客户端。持有
ClientCnxn
和ZKWatchManager
的实例,提供了客户端对节点操作的方法。ZKWatchManager
watcher机制是zookeeper的一大特性,实现了分布式数据变化对客户端的实时通知功能,需要服务端和客户端的共同实现,可参考Zookeeper Watcher机制。包括对节点数据内容,子节点变化和默认none类型事件的三种watch.
ClientCnxn
管理客户端的socket i/o,包含两个核心线程。
SendThread
负责与服务端之间的读写网络通信和心跳维持。EventThread
主要对经过SendThread
反序列化后的具体请求根据类型的不同,具体处理,同时也会负责对请求事件的watch回调。此外ClientCnxn含有HostProvider
对象,能够在会话未过期前与当前服务端连接中断时,自动选择一个可用的服务端重建连接。将在初始化过程中详细介绍。StaticHostProvider
zookeeper对
HostProvider
接口的简单实现,持有服务端的地址列表,可通过轮询的方式获得服务端的所有地址。HostProvider
接口定义如下:
public interface HostProvider {
//返回服务器地址的总大小
public int size();
/**
* The next host to try to connect to.
*
* For a spinDelay of 0 there should be no wait.
* 必须返回已被解析的InetSocketAddress对象
*/
public InetSocketAddress next(long spinDelay);
/**
* Notify the HostProvider of a successful connection.
* The HostProvider may use this notification to reset it's inner state.
*/
public void onConnected();
}
在对StaticHostProvider
的初始化时,会将解析出来的InetSocketAddress对象随机打散,防止多个客户端均连接到一台服务器上
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses)
throws UnknownHostException {
·············省略List<InetSocketAddress> serverAddresses的构造过程·····················
Collections.shuffle(this.serverAddresses);
}
主要看下StaticHostProvider.next
的实现
//记录当前客户端奕尝试连接完所有服务端的位置
private int lastIndex = -1;
//记录当前客户端尝试连接的服务端位置
private int currentIndex = -1;
public InetSocketAddress next(long spinDelay) {
++currentIndex;
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}
return serverAddresses.get(currentIndex);
}
实现逻辑就是从位于第一个的服务器开始,挨个向后获取下一个可尝试连接的服务器地址,如果走了一轮,则在下一轮获取第一个服务器前,需要睡眠spinDelayms,因为这种情况很可能服务器集群出现了某种故障。流程示意图如下:
环形地址列表队列
初始化过程
直接初始化一个ZooKeeper客户端对象,就是客户端的启动入口。如:
ZooKeeper zooKeeper = new ZooKeeper(ZK_SERVER, ZK_CONNECTION_TIMEOUT, null);
最终构造方法为:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
//设置默认defaultWatcher
//private final ZKWatchManager watchManager = new ZKWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
流程为:
1.设置默认defaultWatcher
主要是用来对事件类型为EventType.None类型的回调。
2.初始化StaticHostProvider
主要是将服务端地址列表字符串解析为List<InetSocketAddress> serverAddresses
,可供socket连接使用
3.初始化ClientCnxn
首先看下ClientCnxn
的主要成员变量
//Packet:封装了客户端一次请求或服务端一次响应的完整数据
//已经发送但是等待服务端响应的packet集合
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
//需要发送的packet集合
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
//建立连接的超时时间
private int connectTimeout;
//服务端认为的下一次会话过期的具体时间
private volatile int negotiatedSessionTimeout;
//客户端认为的最大会话超时时间,默认为sessionTimeout * 2 / 3
private int readTimeout;
//会话的超时时间
private final int sessionTimeout;
private long sessionId;
private byte sessionPasswd[] = new byte[16];
//客户端的命名空间,客户端所有的数据节点的路径都会默认在这层路径下创建。可通过`connectString`参数
//传入,如`127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a`,则`chrootPath=/app/a
final String chrootPath;
//用于记录客户端请求发起的先后顺序,没发送一个packet出去加1
private int xid = 1;
//客户端连接状态,面向与服务端的连接状态
private volatile States state = States.NOT_CONNECTED;
final SendThread sendThread;
final EventThread eventThread;
private final ZooKeeper zooKeeper;
private final ClientWatchManager watcher;
private final HostProvider hostProvider;
构造方法为:
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
throws IOException {
this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
clientCnxnSocket, 0, new byte[16], canBeReadOnly);
}
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
//是否为只读客户端,默认不是
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}
初始化sendThread的构造方法为:
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
主要是将当前客户端连接状态置为States.CONNECTING,并初始化了ClientCnxnSocket
ClientCnxnSocket :
负责实现更底层与服务端的socket io,默认实现为ClientCnxnSocketNIO
,即NIO方式,它的成员变量为:
//socket是否未初始化
protected boolean initialized;
//已经发送的请求数量
protected long sentCount = 0;
//已经接收的响应数量
protected long recvCount = 0;
//最后一次接收响应的时间
protected long lastHeard;
//最后一次发送请求的时间
protected long lastSend;
//记录当下的时间
protected long now;
protected ClientCnxn.SendThread sendThread;
//输出日志和异常信息时使用
protected long sessionId;
//NIO相关
private final Selector selector = Selector.open();
private SelectionKey sockKey;
/**
* This buffer is only used to read the length of the incoming message.
*/
protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
/**
* After the length is read, a new incomingBuffer is allocated in
* readLength() to receive the full message.
*/
protected ByteBuffer incomingBuffer = lenBuffer;
ClientCnxnSocket底层梳理IO的方法为doTransport
,将在后续介绍。
eventThread:
事件处理线程,同样为守护线程,主要成员变量为:
//待处理事件的集合
private final LinkedBlockingQueue<Object> waitingEvents =
new LinkedBlockingQueue<Object>();
/** This is really the queued session state until the event
* thread actually processes the event and hands it to the watcher.
* But for all intents and purposes this is the state.
*/
//面向事件处理
private volatile KeeperState sessionState = KeeperState.Disconnected;
private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;
4.启动ClientCnxn
public void start() {
sendThread.start();
eventThread.start();
}
可以看到分别启动了sendThread和eventThread线程
sendThread.run
public void run() {
//初始化clientCnxnSocket.sendThread和clientCnxnSocket.sessionId,后者此时为0
clientCnxnSocket.introduce(this,sessionId);
//更新clientCnxnSocket.now为当前时间
clientCnxnSocket.updateNow();
//更新clientCnxnSocket.lastSend和clientCnxnSocket.lastHeard为当前时间
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = System.currentTimeMillis();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
while (state.isAlive()) {
//连接状态不为States.CLOSED 或 States. AUTH_FAILED
try {
//当clientCnxnSocket.sockKey == null时,说明底层连接已和服务端断开
if (!clientCnxnSocket.isConnected()) {
if(!isFirstConnect){
//不是第一次连接且连接断开
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
//非客户端主动关闭或认证失败,连接断开时客户端会重新连接服务端
//也是第一次连接的入口
startConnect();
clientCnxnSocket.updateLastSendAndHeard();
}
if (state.isConnected()) {
······················省略sasl认证··················
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
//重置连接可用时间
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
·····················省略定时心跳发送和检测···················
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x"
+ Long.toHexString(getSessionId())
+ " : " + e.getMessage());
}
break;
} else {
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else {
LOG.warn(
"Session 0x"
+ Long.toHexString(getSessionId())
+ " for server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", unexpected error"
+ RETRY_CONN_MSG, e);
}
//说明线程出现异常,清空pendingQueue和outgoingQueue,需要等待客户端重新连接
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
}
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
}
第一次创建连接的方法为sendThread.startConnect
private void startConnect() throws IOException {
state = States.CONNECTING;
InetSocketAddress addr;
if (rwServerAddress != null) {
addr = rwServerAddress;
rwServerAddress = null;
} else {
addr = hostProvider.next(1000);
}
··············省略zookeeper Sasl认证校验的过程,如果失败会发送EventType.None、KeeperState.AuthFailed的事件···············
clientCnxnSocket.connect(addr);
}
实际建立连接的方法为clientCnxnSocket.connect
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;
// Reset incomingBuffer
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
//因为SocketChannel sock是非阻塞的,此时并没有马上建立真正的连接就会返回true
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection();
}
}
当连接返回时,会调用sendThread.primeConnection
构建发送队列的packet
void primeConnection() throws IOException {
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
synchronized (outgoingQueue) {
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
if (!disableAutoWatchReset) {
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
long setWatchesLastZxid = lastZxid;
while (dataWatchesIter.hasNext()
|| existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we add the last
// watch in the batch. This isn't ideal, but it makes the code simpler.
while (batchLength < SET_WATCHES_MAX_LENGTH) {
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
SetWatches sw = new SetWatches(setWatchesLastZxid,
dataWatchesBatch,
existWatchesBatch,
childWatchesBatch);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
h.setXid(-8);
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
outgoingQueue.addFirst(packet);
}
}
}
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));
}
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
//只注册SelectionKey.OP_READ 和 SelectionKey.OP_WRITE事件
clientCnxnSocket.enableReadWriteOnly();
}
注册SelectionKey.OP_READ 和 SelectionKey.OP_WRITE事件到selector上
发送队列outgoingQueue从头到尾添加的packet的请求内容依次为:ConnectRequest ->AuthPacket(如果需要认证的话)->SetWatches (ps:当前watch信息,是需要服务端响应的请求,所以每次成功建立连接都需要将当前客户端的watch信息发送给连接的服务端)
之后run方法会调用clientCnxnSocket.doTransport
,也是处理整个IO的方法
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
此时selector上注册的事件为读写事件,将会调用doIO
,
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
//第一次连接调到此处
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
}
首先会触发写事件sockKey.isWritable(),(因为写缓冲区可写但此时服务端并不会发送数据到客户端)将从outgoingQueue取出第一个ConnectRequest 的packet发送出去,然后取消写事件,等待服务端的响应
当除发读事件sockKey.isReadable()时,读取服务端的连接响应数据readConnectResult
void readConnectResult() throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
}
会将响应数据反序列化为ConnectResponse conRsp,从中读取sessionId ,并调用连接回调函数sendThread.onConnected
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
throw new SessionExpiredException(warnInfo);
}
if (!readOnly && isRO) {
LOG.error("Read/write client got connected to read-only server");
}
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
//lastIndex = currentIndex
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;
seenRwServerBefore |= !isRO;
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));
}
1.根据服务端认可的下次会话过期时间,重置客户端的negotiatedSessionTimeout
,readTimeout
和connectTimeout
2.回调hostProvider,重置环形地址列表的指针lastIndex
3.设置sessionId ,sessionPasswd
4.根据是否只读,设置连接状态state= States.CONNECTEDREADONLY或 States.CONNECTED
5.发送连接状态事件给eventThread
接下来当一直是连接状态的时候,sendThread就开始不停的处理读写请求数据和定时发送心跳,可参考会话管理
eventThread.run
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
主要流程为:
1.读取请求事件Object event = waitingEvents.take()
2.处理请求事件processEvent(event)
,再此先不展开事件具体处理。
参考资料:ZooKeeper的Java客户端使用
感谢您的阅读,我是Monica23334 || Monica2333 。立下每周写一篇原创文章flag的小姐姐,关注我并期待打脸吧~