zk源码阅读13:watch运行机制
摘要
前面讲完了watch的数据结构以及在client和server端的统一管理
这一节讲解watch的流程机制,不过这一系列本来是先讲zk的数据结构的,讲到watch数据结构,不讲watch的机制又不合理
所以这一节会顺带简单讲解zk client与server的交互
本节主要讲解
client端注册watcher
client端watcher的注册,管理
client端watcher在网络请求中的体现
client端接收server回复时,watcher的注册
server端处理watcher
收到client请求时进行ServerCnxn的注册
触发事件时,通过WatchManager找到相应Watcher(ServerCnxn),进而通知对该事件感兴趣的client
client端回调watcher
client端接收server的通知,调用queueEvent函数放在waitingEvents队列中
ClientCnxn.EventThread#run调用ClientCnxn.EventThread#processEvent,消费waitingEvents,回调watcher.process()
client端注册watcher
注册方式
注册方式主要分为两种,都在Zookeeper类中,所在的方法列举如下
1.注册默认watcher
public synchronized void register(Watcher watcher)
构造函数
2.getData,getChildren和exist请求中注册自定义Watcher
public Stat exists(final String path, Watcher watcher)
public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
public byte[] getData(final String path, Watcher watcher, Stat stat)
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
public List<String> getChildren(final String path, Watcher watcher)
public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
第一种默认实现是client构造Zookeeper对象时传递,会记录在ZooKeeper.ZKWatchManager#defaultWatcher中,这种可以理解为"假"注册(自己理解),因为它只是记录了默认的watcher,但是并不一定存在于请求中
第二种是client特定请求注册特定的watcher,这种可以理解为"真"注册,只要请求得到的response没问题,就会有相应的记录存在于ZooKeeper.ZKWatchManager中
但是对于ZK API请求来说,用不用默认watcher,底层注册逻辑都是一样的
比如getData利用默认的watcher,源码如下
public byte[] getData(String path, boolean watch, Stat stat)
throws KeeperException, InterruptedException {
return getData(path, watch ? watchManager.defaultWatcher : null, stat);//实际上就是调用记录好的defaultWatcher
}
实际上调用的还是public byte[] getData(final String path, Watcher watcher, Stat stat)函数,底层调用的和不用默认watcher函数是一样的
client中watcher的注册,管理
通过ZooKeeper.WatchRegistration进行管理,类以及子类如下
WatchRegistration以及子类源码如下
abstract class WatchRegistration {//client中管理watch注册的类
private Watcher watcher;//注册的watcher
private String clientPath;//监听的znode path
public WatchRegistration(Watcher watcher, String clientPath)
{
this.watcher = watcher;
this.clientPath = clientPath;
}
//根据response的resultCode来获取所有注册的path以及对应的watcher集合
abstract protected Map<String, Set<Watcher>> getWatches(int rc);
/**
* Register the watcher with the set of watches on path.
* @param rc the result code of the operation that attempted to
* add the watch on the path.
*/
public void register(int rc) {//根据response的resultCode来注册watcher到一个path
if (shouldAddWatch(rc)) {//如果可以添加
Map<String, Set<Watcher>> watches = getWatches(rc);//获取所有注册的path以及对应的watcher集合
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);//找到该path
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);//添加当前watcher
}
}
}
/**
* Determine whether the watch should be added based on return code.
* @param rc the result code of the operation that attempted to add the
* watch on the node
* @return true if the watch should be added, otw false
*/
protected boolean shouldAddWatch(int rc) {//根据resultCode判断是否可以添加watch
return rc == 0;
}
}
属性clientPath和watcher分别是监听关注的znode的path和对应处理的watcher
注册逻辑就是根据response的resultCode,判断是否可以添加watch,可以添加的话,就在Map<String, Set<Watcher>>添加记录
这里可能疑惑,这个abstract protected Map<String, Set<Watcher>> getWatches(int rc);的实现是怎样的
DataWatchRegistration getWatches返回 ZooKeeper.ZKWatchManager#dataWatches
ExistsWatchRegistration getWatches返回
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return rc == 0 ? watchManager.dataWatches : watchManager.existWatches;
}
ChildWatchRegistration getWatches返回 ZooKeeper.ZKWatchManager#childWatches
这里都涉及到ZKWatchManager的三个Map,dataWatches,existWatches,childWatches 具体可以参考前面watch的第一节client存储相关部分
watcher在请求中,通过标志位发送给server
接着跟进上面的getData函数
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {//如果有watcher,就注册
wcb = new DataWatchRegistration(watcher, clientPath);//生成一个DataWatchRegistration,即Data的watch的注册
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();//生成请求头
h.setType(ZooDefs.OpCode.getData);//设置请求类型为getData
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);//设置标志位,是否函数watch
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);//client端提交请求
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
上面函数中,只要注意有注释的部分,即watch相关部分的代码
里面调用了ClientCnxn#submitRequestsubmitRequest 源码如下
//提交请求
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();//生成回复头
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {//如果packet没有处理完,就一直等着
packet.wait();
}
}
return r;
}
里面调用了ClientCnxn#queuePacket函数
在 ZooKeeper 中,Packet 是一个最小的通信协议单元,即数据包。
Pakcet 用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个 Packet 对象。
在 ClientCnxn 中 WatchRegistration 也会被封装到 Packet 中,调用 queuePacket放入outgoingQueue即发送队列中(生产packet)
然后SendThread 线程调用doTransport方法,从outgoingQueue中消费Packet,客户端发送
queuePacket函数作为生产者,代码中调用
outgoingQueue.add(packet);
然后ClientCnxn.SendThread作为消费者,run方法中调用ClientCnxnSocket#doTransport
参考实现类ClientCnxnSocketNIO#doTransport
里面调用了ClientCnxnSocketNIO#doIO
此时是发送请求,调用了ClientCnxn.Packet#createBB
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");//序列化请求头,包含xid和type
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");//序列化request(对于特定请求如GetDataRequest,包含了是否存在watcher的标志位)
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
这里有一点值得注意:
client的watcher并没有进行网络传输,server并不知道client的watcher触发时process函数要怎么执行
但是对于特定请求类型比如GetDataRequest,序列化的时候会传递一个标志位watch,表示是否watch
server在处理的时候,只知道client是否watch某个path
发送请求的时候,watcher还并没有注册在client端,要等server的返回
请求回复后,watcher在client端注册
ClientCnxn.SendThread中,读取server的回复
调用了ClientCnxnSocketNIO#doTransport
调用了ClientCnxnSocketNIO#doIO
调用了ClientCnxn.SendThread#readResponse
调用了ClientCnxn#finishPacket
private void finishPacket(Packet p) {//Packet请求发送,收到回复,进行处理之后
if (p.watchRegistration != null) {//如果有要注册的watchRegistration
p.watchRegistration.register(p.replyHeader.getErr());//根据response code进行注册
}
if (p.cb == null) {//如果没有异步回调
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {//如果有异步回调
p.finished = true;
eventThread.queuePacket(p);
}
}
里面只要注意watchRegistration最终进行了注册就行,记录在WatchRegistration的三个实现类中
client注册watcher的小结
流程用自己的话说,如下
1.client发送getData,getChildren,exist请求时,传入自定义的watcher,或利用ZooKeeper构造函数的默认Watcher
2.将请求封装为Packet,在RequestHeader记录是否需要watcher,记录放入生产者队列ClientCnxn#outgoingQueue
3.ClientCnxn.SendThread消费outgoingQueue
调用ClientCnxnSocketNIO#doTransport
调用ClientCnxnSocketNIO#doIO
调用ClientCnxn.Packet#createBB
序列化的时候,将request记性序列化,里面包含一个是否带有watch的标志位(不包含watcher对象)
4.server进行相应的处理,之后进行回复
可以参考FinalRequestProcessor#processRequest中对于getData的请求处理
利用getDataRequest.getWatch()),看是否client需要watch,进而注册到DataTree的WatchManager中,下面会讲的
5.ClientCnxn.SendThread读取回复
调用ClientCnxnSocketNIO#doTransport
调用ClientCnxnSocketNIO#doIO
调用ClientCnxn.SendThread#readResponse
调用ClientCnxn#finishPacket
利用response code,进行watcher的注册,记录在ZooKeeper.WatchRegistration对应的实现类中
示意图如下,下面会介绍server是如何处理client的watch注册请求的
client注册watch流程
服务端处理watcher
服务端注册watcher
上面只讲了client发送注册watcher的请求,client根据server的response进行watcher的注册,并没有讲解server是怎么处理请求的,这里展开讲解
时序图如下
服务端注册watcher示意图
server前面的调用链这里不展开,感兴趣参考这个链接
就从FinalRequestProcessor#processRequest的处理开始讲
针对getData的请求处理如下
case OpCode.getData: {//getData请求
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);//反序列化出getDataRequest
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}//验证path对应的node是否存在
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);//验证ACL权限
Stat stat = new Stat();
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);//如果有watch标志位,Watcher就传cnxn
rsp = new GetDataResponse(b, stat);
break;
}
注意这里根据GetDataRequest对象的getWatch()方法,即client是否在这个path上有要注册watcher,有的话,就注册cnxn
cnxn是一个ServerCnxn对象,ServerCnxn是什么
类图如下
ServerCnxn抽象类与两个子类
ServerCnxn 是一个 ZooKeeper 客户端和服务器之间的连接接口,代表了一个客户端和服务器的连接.
实现了Watcher接口,有两个子类
作用就是:
这个Watcher的实现类记录了client和server的连接,回调的时候,可以直接发送response告诉client,有事件触发了
在下面讲server触发Watcher的时候会详细介绍
上面的FinalRequestProcessor#processRequest调用会进入
ZKDatabase#getData
DataTree#getData
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
dataWatches.addWatch(path, watcher);//注册watcher到dataWatches
}
return n.data;//返回byte[]
}
}
这里稍微提前了一点,讲到了DataTree,好在这里并不是很难理解,因为前面已经讲过WatchManager
WatchManager 负责 Watcher 事件的触发,它是一个统称
在服务端 DataTree 会托管两个 WatchManager,分别是 dataWatches 和 childWatches
分别对应数据变更 Watcher 和子节点变更 Watcher。
因此,结合上面的时序图,就可以理解请求是如何经过ZKdatabase到DataTree最终记录在WatchManager,这里就完成了watcher在服务端的注册
服务端返回getData()给client这个参照时序图就好了,这里不赘述
server触发watch
上面的watch事件是针对getData的,加入这个时候同样path的znode有一个setData请求
(忽略前面的调用链)
进入到DataTree#setData
调用WatchManager#triggerWatch方法
在WatchManger那一节讲过
//从指定的watcher集合supress 中筛选出要触发的watcher,将剩下的watcher执行对应的回调
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
//构建WatchedEvent
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;//如果watchTable没有path这条记录,返回空
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);//在watch2Paths中删掉[watcher,path]这种记录
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {//从supress中过滤掉部分watcher(类似抑制触发)
continue;
}
w.process(e);//没有被抑制的watcher进行回调
}
return watchers;//返回所有触发的watcher
}
这里只要知道调用w.process(e)的时候w是什么
在server端注册watch的时候讲过,是ServerCnxn对象,以NIOServerCnxn这个子类的实现为例
@Override
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);//xid为-1表示为通知
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();//包装为WatcherEvent来提供网络传输
sendResponse(h, e, "notification");//给client发送请求,通知WatchedEvent的发生
}
也就是说,server触发watcher,回调process函数其实就是告诉需要watch的client,WatcherEvent 发生了
服务端处理watcher小结
注册时watcher是ServerCnxn类型,保存了和client的会话,如果client发送请求的时候,request的标志位watch为true,server才会将这个会话注册到WatchManager(否则server知道client对这个path不感兴趣,下次这个path变化了也不通知你)
触发watcher时,就利用watchManager,找到path对应的watcher即ServerCnxn,告诉连接的client方,发生了WatcherEvent,client自己再处理
client回调watcher
ClientCnxn.SendThread读取回复
调用ClientCnxnSocketNIO#doTransport
调用ClientCnxnSocketNIO#doIO
调用ClientCnxn.SendThread#readResponse
里面处理事件通知的代码段
if (replyHdr.getXid() == -1) {//-1代表通知类型 即WatcherEvent
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");//反序列化WatcherEvent
// convert from a server path to a client path
if (chrootPath != null) {//把serverPath转化成clientPath
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);//WatcherEvent还原成WatchedEvent
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );//加入队列
return;
}
这里将WatchedEvent 放入生成队列,调用ClientCnxn.EventThread#queueEvent
public void queueEvent(WatchedEvent event) {//将WatchedEvent加入队列
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}//验证状态???
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);//用WatcherSetEventPair封装watchers和watchedEvent
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);//加入队列
}
waitingEvents的消费在ClientCnxn.EventThread#run中
最终调用了ClientCnxn.EventThread#processEvent
相应处理回调的代码块是
if (event instanceof WatcherSetEventPair) {//如果是通知事件
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {//从WatcherSetEventPair这个wraper中取出watchers和event
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
可能有个问题,哪里用到了client中watch的记录,进行相应的get和remove操作
答案就在ClientCnxn.EventThread#queueEvent中调用了ClientWatchManager#materialize方法,参考前面讲ClientWatchManager的章节
这样就完成了client中watcher的回调
思考
watcher与client server的网络传输
client在Packet中封装了WatchRegistration,但是在发送给server的时候,并没有传输watcher对象
只是在Packet的Request对象中,存在一个标志位watch
server根据标志位进行处理,有标志位则记录一个ServerCnxn
可以参考GetDataRequest#watch的get和set方法的引用
这也就是Watcher的轻量的特性体现
server watcher的注册
server注册的watcher是ServerCnxn的子类,它记录了client和server的回话
回调函数process就是给client发通知,告诉client发生了怎么样的WatchedEvent
然后client自己根据本地注册的watcher去进行对应的process
client注册的watcher和server注册的watcher有什么区别
作用和类型有区别
client注册的watcher类型没有限制,作用就是说client监控到xx事件后干的事情,比如重新获取数据
server注册的watcher都是ServerCnxn类型,作用就是告诉对应client 发生了xx WatchedEvent就行
由于watcher并没有直接在网络进行传输,所以两者并不一样
server怎么知道一个WatchedEvent触发,要通知哪些client
server的watch是ServerCnxn,保持了和Client的对话,直接回调process就行了
都是ServerCnxn(实现了Watcher)的功劳
触发的粒度是什么,是(path)还是(path,EventType)
是path,监听的时候是监听一个path,只是对不同的 EventType进行不同的处理或者不处理
即使对所有EventType都不处理,server记录对应的path有响应事件发生还是会告诉给client
参照WatchManager#triggerWatch
粒度是(path)
问题
ServerCnxn怎么保存client连接会话信息的
源码还要自己再看
ClientCnxn.EventThread#queueEvent
里面验证状态是干吗用,后续研究请求发出和处理response的时候再看
refer
《paxos到zk》第7章
https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/index.html
http://www.cnblogs.com/leesf456/p/6291004.html
http://blog.csdn.net/u012291108/article/details/59698624
http://blog.csdn.net/qianshangding0708/article/details/50084155