Zookeeper Watcher事件监听

2019-05-27  本文已影响0人  逆水寻洲

前言

zk作为一款成熟的分布式协调框架,订阅-发布功能是很重要的一个。所谓订阅发布功能,就是观察者模式。观察者会订阅一些感兴趣的主题,然后这些主题一旦变化了,就会自动通知到这些观察者。

zk的订阅发布也就是watch机制,是一个轻量级的设计。因为它采用了一种推拉结合的模式。一旦服务端感知主题变了,那么只会发送一个事件类型和节点信息给关注的客户端,而不会包括具体的变更内容,所以事件本身是轻量级的,这就是所谓的“推”部分。然后,收到变更通知的客户端需要自己去拉变更的数据,这就是“拉”部分。

zookeeper Watcher机制的特点

  1. 一次性的触发器(one-time trigger)

当数据改变的时候,Watch事件会产生并且被发送到客户端中。但是客户端只会收到一次这样的通知,如果以后这个数据再次发生改变的时候,之前设置Watch的客户端将不会再次收到改变的通知,因为Watch机制规定了它是一个一次性的触发器。
当设置监视的数据发生改变时,该监视事件会被发送到客户端。例如,如果客户端调用了 getData("/znode1", true) 并且稍后 /znode1 节点上的数据发生了改变或者被删除了,客户端将会获取到 /znode1 发生变化的监视事件,而如果 /znode1 再一次发生了变化,除非客户端再次对 /znode1 设置监视,否则客户端不会收到事件通知。

  1. 发送给客户端(Sent to the client)

这个表明了Watch的通知事件是从服务器发送给客户端的,是异步的,这就表明不同的客户端收到的Watch的时间可能不同,但是ZooKeeper有保证:当一个客户端在看到Watch事件之前是不会看到结点数据的变化的。例如:A=3,此时在上面设置了一次Watch,如果A突然变成4了,那么客户端会先收到Watch事件的通知,然后才会看到A=4。
Zookeeper 客户端和服务端是通过 Socket 进行通信的,由于网络存在故障,所以监视事件很有可能不会成功地到达客户端,监视事件是异步发送至监视者的,Zookeeper 本身提供了保序性(ordering guarantee):即客户端只有首先看到了监视事件后,才会感知到它所设置监视的 znode 发生了变化(a client will never see a change for which it has set a watch until it first sees the watch event). 网络延迟或者其他因素可能导致不同的客户端在不同的时刻感知某一监视事件,但是不同的客户端所看到的一切具有一致的顺序。

  1. 被设置Watch的数据(The data for which the watch was set)

这意味着 znode 节点本身具有不同的改变方式。你也可以想象 Zookeeper 维护了两条监视链表: 数据监视和子节点监视(data watches and child watches) 。
getData() and exists() 设置数据监视,getChildren() 设置子节点监视。 或者,你也可以想象 Zookeeper 设置的不同监视返回不同的数据,getData() 和 exists() 返回 znode 节点的相关信息,而 getChildren() 返回子节点列表。
因此, setData() 会触发设置在某一节点上所设置的数据监视(假定数据设置成功),而一次成功的 create() 操作则会出发当前节点上所设置的数据监视以及父节点的子节点监视。一次成功的 delete() 操作将会触发当前节点的数据监视和子节点监视事件,同时也会触发该节点父节点的child watch。

各种watch触发的情况总结

可以注册watcher的方法:getData、exists、getChildren。
可以触发watcher的方法:create、delete、setData。连接断开的情况下触发的watcher会丢失。
一个Watcher实例是一个回调函数,被回调一次后就被移除了。如果还需要关注数据的变化,需要再次注册watcher。
New ZooKeeper时注册的watcher叫default watcher,它不是一次性的,只对client的连接状态变化作出反应。

Watcher接口

在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventType两个枚举类,分别代表了通知状态和事件类型,同时定义了事件的回调方法:process(WatchedEvent event)。

Watcher 接口定义如下

public interface Watcher {
    abstract public void process(WatchedEvent event);
}

Watcher通知状态与事件类型

keeperState和eventType对应关系如下所示:


image.png

Watcher接口测试代码

public class ZkClientWatcher implements Watcher {

    // 集群连接地址
    private static final String CONNECT_ADDRES = "127.0.0.1:2181";
    // 会话超时时间
    private static final int SESSIONTIME = 2000;
    // 信号量,让zk在连接之前等待,连接成功后才能往下走.
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static String LOG_MAIN = "[main]";
    private ZooKeeper zk;

    public void createConnection(String connectAddres, int sessionTimeOut) {
        try {
            zk = new ZooKeeper(connectAddres, sessionTimeOut, this);
            System.out.println(LOG_MAIN + "zk 开始启动连接服务器....");
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public boolean createPath(String path, String data) {
        try {
            this.exists(path, true);
            this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println(LOG_MAIN + "节点创建成功, Path:" + path + ",data:" + data);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 判断指定节点是否存在
     *
     * @param path 节点路径
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public boolean updateNode(String path, String data) throws KeeperException, InterruptedException {
        exists(path, true);
        this.zk.setData(path, data.getBytes(), -1);
        return false;
    }

    public boolean deleteNode(String path) throws KeeperException, InterruptedException {
        exists(path, true);
        this.zk.delete(path,-1);
        return false;
    }


    public void process(WatchedEvent watchedEvent) {

        // 获取事件状态
        Event.KeeperState keeperState = watchedEvent.getState();
        // 获取事件类型
        Event.EventType eventType = watchedEvent.getType();
        // zk 路径
        String path = watchedEvent.getPath();
        System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path);
        // 判断是否建立连接
        if (Event.KeeperState.SyncConnected == keeperState) {
            if (Event.EventType.None == eventType) {
                // 如果建立建立成功,让后程序往下走
                System.out.println(LOG_MAIN + "zk 建立连接成功!");
                countDownLatch.countDown();
            } else if (Event.EventType.NodeCreated == eventType) {
                System.out.println(LOG_MAIN + "事件通知,新增node节点" + path);
            } else if (Event.EventType.NodeDataChanged == eventType) {
                System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被修改....");
            } else if (Event.EventType.NodeDeleted == eventType) {
                System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被删除....");
            }else if(Event.EventType.NodeChildrenChanged==eventType){
                System.out.println(LOG_MAIN + "事件通知,当前node节点的子节点列表发生改变:" + path );
            }
        }
        System.out.println("--------------------------------------------------------");
    }

    public static void main(String[] args) throws KeeperException, InterruptedException {
        ZkClientWatcher zkClientWatcher = new ZkClientWatcher();
        zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME);
        zkClientWatcher.createPath("/watcher","watcherData");
        zkClientWatcher.updateNode("/watcher","new Data");
        zkClientWatcher.zk.getChildren("/watcher",true);
        zkClientWatcher.createPath("/watcher/child","child");
        zkClientWatcher.deleteNode("/watcher/child");
        zkClientWatcher.deleteNode("/watcher");
    }
}

运行结果如下:

[main]zk 开始启动连接服务器....
进入到 process() keeperState:SyncConnected, eventType:None, path:null
[main]zk 建立连接成功!
--------------------------------------------------------
进入到 process() keeperState:SyncConnected, eventType:NodeCreated, path:/watcher
[main]事件通知,新增node节点/watcher
--------------------------------------------------------
[main]节点创建成功, Path:/watcher,data:watcherData
进入到 process() keeperState:SyncConnected, eventType:NodeDataChanged, path:/watcher
[main]事件通知,当前node节点/watcher被修改....
--------------------------------------------------------
进入到 process() keeperState:SyncConnected, eventType:NodeCreated, path:/watcher/child
[main]事件通知,新增node节点/watcher/child
--------------------------------------------------------
进入到 process() keeperState:SyncConnected, eventType:NodeChildrenChanged, path:/watcher
[main]事件通知,当前node节点的子节点列表发生改变:/watcher
--------------------------------------------------------
[main]节点创建成功, Path:/watcher/child,data:child
进入到 process() keeperState:SyncConnected, eventType:NodeDeleted, path:/watcher/child
[main]事件通知,当前node节点/watcher/child被删除....
--------------------------------------------------------
进入到 process() keeperState:SyncConnected, eventType:NodeDeleted, path:/watcher
[main]事件通知,当前node节点/watcher被删除....
--------------------------------------------------------
Process finished with exit code 0
上一篇下一篇

猜你喜欢

热点阅读