Zookeeper

ZK 默认watch为什么只生效一次

2022-01-16  本文已影响0人  Alen_ab56

以create节点为例
有2种请求流转方式
第一种:client-follower-leader-client
第二种:client-leader-client

假设第一种:
client发送给follower,那么follower会转发请求到leader,leader发送proposal提案给到所有的follower,follower收到proposal后由SyncProcessor进行处理,主要是将写到事务日志,然后
Follower#processPacket
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();

        if (hdr.getType() == OpCode.reconfig){
           SetDataTxn setDataTxn = (SetDataTxn) txn;       
           QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
           self.setLastSeenQuorumVerifier(qv, true);                               
        }
        
        fzk.logRequest(hdr, txn);
        break;

DataTree#createNode
public void createNode(final String path, byte data[], List<ACL> acl,
long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat)
throws KeeperException.NoNodeException,
KeeperException.NodeExistsException {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
StatPersisted stat = new StatPersisted();
stat.setCtime(time);
stat.setMtime(time);
stat.setCzxid(zxid);
stat.setMzxid(zxid);
stat.setPzxid(zxid);
stat.setVersion(0);
stat.setAversion(0);
stat.setEphemeralOwner(ephemeralOwner);
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
synchronized (parent) {
Set<String> children = parent.getChildren();
if (children.contains(childName)) {
throw new KeeperException.NodeExistsException();
}

        if (parentCVersion == -1) {
            parentCVersion = parent.stat.getCversion();
            parentCVersion++;
        }
        parent.stat.setCversion(parentCVersion);
        parent.stat.setPzxid(zxid);
        Long longval = aclCache.convertAcls(acl);
        DataNode child = new DataNode(data, longval, stat);
        parent.addChild(childName);
        nodes.put(path, child);
        EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
        if (ephemeralType == EphemeralType.CONTAINER) {
            containers.add(path);
        } else if (ephemeralType == EphemeralType.TTL) {
            ttls.add(path);
        } else if (ephemeralOwner != 0) {
            HashSet<String> list = ephemerals.get(ephemeralOwner);
            if (list == null) {
                list = new HashSet<String>();
                ephemerals.put(ephemeralOwner, list);
            }
            synchronized (list) {
                list.add(path);
            }
        }
        if (outputStat != null) {
            child.copyStat(outputStat);
        }
    }
    // now check if its one of the zookeeper node child
    if (parentName.startsWith(quotaZookeeper)) {
        // now check if its the limit node
        if (Quotas.limitNode.equals(childName)) {
            // this is the limit node
            // get the parent and add it to the trie
            pTrie.addPath(parentName.substring(quotaZookeeper.length()));
        }
        if (Quotas.statNode.equals(childName)) {
            updateQuotaForPath(parentName
                    .substring(quotaZookeeper.length()));
        }
    }
    // also check to update the quotas for this node
    String lastPrefix = getMaxPrefixWithQuota(path);
    if(lastPrefix != null) {
        // ok we have some match and need to update
        updateCount(lastPrefix, 1);
        updateBytes(lastPrefix, data == null ? 0 : data.length);
    }
    dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
    childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
            Event.EventType.NodeChildrenChanged);
}

WatchManager#triggerWatch
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}

看这2部分

  1. watchers = watchTable.remove(path);
    2. for (Watcher w : watchers) {
    HashSet<String> paths = watch2Paths.get(w);
    if (paths != null) {
    paths.remove(path);
    }
    }

——————源码面前,了无秘密——————

上一篇 下一篇

猜你喜欢

热点阅读