我爱编程

我所知道的zookeeper

2018-06-11  本文已影响0人  小吴可是全村的希望

初识zookeeper

那一年,听见一个叫做dubbo的东西,据说是阿里的,只知道这个牛逼的东西能让你在不同的系统之间的远程调用像本地方法一样去调用,我心甚往,遂观之,然识君。当时觉得他就是个注册中心,后来闲暇时深入研究了下,一发不可收拾,发现zookeeper竟然能做如此多的事情,请听我慢慢道来。

zookeeper的数据类型

我理解的zk其实是一个文件系统,比如:/user/home,/opt/logs等,这些节点在zk中都称为znode,znode的类型分为四种:临时节点,持久节点,临时顺序节点,持久顺序节点

zookeeper的用法特性

客户端创建节点之前需要连接zk服务,连接代码如下:

/**
 * @author 吴镇
 * @description:创建一个简单的zookeeper会话
 * @createdate 2016/9/9
 */
public class ZkSimple implements Watcher{

    private static ZkSimple zkSimple = null;

    //在当前线程中调用CountDownLatch的await方法会使得当前线程一直处于阻塞状态,每次调用countDown
    //方法会使得计数器减一
    //直到计数器变成0,当前线程即可恢复执行
    //可用来使当前线程等待其他线程完成任务
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper = null;

    private static Stat stat = new Stat();

    /**
     * 负责处理来自zk服务端的通知
     * @param watchedEvent
     */
    public void process(WatchedEvent watchedEvent) {
        System.out.println("接收watched事件:" + watchedEvent);
        if(KeeperState.SyncConnected == watchedEvent.getState()){
            //判断事件类型
            if(Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()){
                //将计数器减一
                countDownLatch.countDown();
            }else if(Event.EventType.NodeChildrenChanged == watchedEvent.getType()){//子节点变更事件
                try {
                    List<String> list = zooKeeper.getChildren(watchedEvent.getPath(),true);
                    System.out.println("子节点变更后重新获取节点数据:"+list);
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println("获取子节点失败");
                }
            }else if(Event.EventType.NodeDataChanged == watchedEvent.getType()){ //节点数据变更事件
                String data = "";
                try {
                    //数据变更后从新获取数据,并且重新注册watcher
                    data = ZkDataSync.getData(watchedEvent.getPath()); 
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("数据变更后获取的数据为:" + data);
            }

        }
    }

    public static ZooKeeper connectZkService(String connectString, int timeOut) throws IOException {
        //参数说明
        //参数1:连接zk服务列表,如果有多台zk节点,使用逗号隔开
        //参数2:指会话超时时间,单位毫秒
        //参数3:为一个实例化的watcher
        //构造方法内部实现了与zk服务端之间的TCP连接创建
        zooKeeper = new ZooKeeper(connectString,timeOut,new ZkSimple());
        System.out.println("当前连接状态:"+zooKeeper.getState());

        //由于zk服务端的连接是一个异步的过程,所以这里先阻塞当前线程
        //一直阻塞当前main线程,直到计数器为0是抛出异常
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {}

        //main线程阻塞状态解除
        System.out.println("zk会话已建立");
        return zooKeeper;
    }

    public static void main(String[] args) throws IOException, InterruptedException {
    // ZkSimple.connectZkService("10.21.40.55:2181",5000);
    }
}

创建连接的类中,实现了Watcher接口,在创建连接的时候将这个类的实例作为参数带过去,表示这个类就是一个Watcher,并且实现了Watcher接口的process方法,watcher在zk中是一个非常重要的概念,当客户端创建节点的时候,当前节点便有了一个watcher,当节点创建数据或者数据变更或者子节点变更的时候,客户端都能接收到这个变化的事件(就是process方法),但是,仅仅只能收到变更的事件,而不能收到变更的内容。timeout表示会话超时时间,单位毫秒,由于创建连接的过程是异步的,所以这里使用CountDownLatch的await方法来阻塞main线程,直到zk服务创建成功(即收到服务端的回调请求,调用process方法),至此连接创建成功。其中process方法实现了Watcher接口中的方法,为zk服务端的回调方法。
当连接创建完了之后,便可以开始创建节点和数据了,客户端向zk注册节点的时候有两种方式:同步和异步,我们来试试同步创建一个临时顺序节点试试:

/**
 * @author 吴镇
 * @description:
 * @createdate 2016/9/13
 */
public class ZkCreateNodeSync {

    public static String createZnode(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException {
        ZooKeeper zooKeeper = null;
        try {
            zooKeeper = ZkSimple.connectZkService("10.21.40.55:2181",1000 * 10);
        } catch (IOException e) {
            System.out.println("连接zk服务端异常");
        }
        return zooKeeper.create(path,data,acl,createMode);
    }

    public static void main(String[] args) {
        try {
            //CreateMode.EPHEMERAL-->创建临时节点CreateMode.EPHEMERAL_SEQUENTIAL-->创建临时顺序节点
            String path = ZkCreateNodeSync.createZnode("/我是一个临时节点","".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("创建znode的路径为:"+path);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("创建znode失败");
        }
    }
}

创建一个会话超时时间为10S的zk连接,然后创建一个临时顺序节点后,我们去看看zk服务端的节点状况,采用ls命令来查看节点:

ls /
[我是一个临时节点0000000011, dubbo, zk, test, zookeeper]

可以看到临时节点被创建了,并且节点名称后面被添加了0000000011,在10s后,这个节点自动被zk删除了。
再来看看异步创建节点,异步创建节点,需要提供一个回调函数:

/**
 * @author 吴镇
 * @description:异步创建节点
 * @createdate 2016/9/13
 */
public class ZkCreateNodeAsync {

    public static void createZnode(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback sc, Object ctx) throws KeeperException, InterruptedException {
        ZooKeeper zooKeeper = null;
        try {
            zooKeeper = ZkSimple.connectZkService("10.21.40.55:2181",1000 * 20);
        } catch (IOException e) {
            System.out.println("连接zk服务端异常");
        }
        //异步创建节点
        zooKeeper.create(path,data,acl,createMode,sc,ctx);
    }

    public static void main(String[] args) {
        try {
            //CreateMode.EPHEMERAL-->创建临时节点   CreateMode.EPHEMERAL_SEQUENTIAL-->创建临时顺序节点
            ZkCreateNodeAsync.createZnode("/我是一个异步创建的节点临时","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(),"我是一个上下文");
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("异步创建znode失败");
        }
    }
}

class IStringCallback implements AsyncCallback.StringCallback{

    public void processResult(int rc, String path, Object ctx, String name) {
        System.out.println("创建节点回调数据:["+rc+"," + path + "," + ctx + ", 真实的路径名称为:" + name);
    }
}

其中IStringCallback类就是提供回调函数的,回调函数中会返回这个节点的全路径,其实同步和异步创建过程基本相同,只不过异步不阻塞创建线程而已。当在某一个节点下面创建子节点的时候,会收到zk服务端的通知(注册了watcher),事件类型为:Event.EventType.NodeChildrenChanged(子节点变更事件),变更事件中包含变更的节点路径,根据这个路径我们可以重新通过getChildren方法去获取子节点列表。数据变更的事件为:Event.EventType.NodeDataChanged(节点数据变更事件),可以通过getData方法来获取节点变更的数据,<font color='red'>根据以上特性,我们可以使用zookeeper来实现集中式配置中心</font>。创建数据这里就不多将了,也可以分为同步和异步创建,有一点需要注意,zk的数据只能是byte类型的

zookeeper的客户端

在上面已经实现了简单的客户端代码,只不过,还有诸多问题,比如,zookeeper的watcher是一次性的,当收某个节点的通知的时候,这个节点上的watcher就会被删除掉了,在实际应用中,频繁的去注册watcher会很浪费资源。还有当会话超时之后,没有重新连接的机制。还有数据类型只有简单的byte,没有对象级别的存储。还有删除节点的时候无法级联删除。针对这些问题,有了一些开源的客户端:Curator和ZkClient,这两个客户端各有优劣,在dubbo中就是使用了ZkClient,ZkClient解决了session超时,watcher一次性等问题,但是文档几乎没有,异常处理过于简单,没有提供现成的各种场景实现。Curator相对来说好很多,也是解决了zk存在的那些问题,并且提供了分布式锁, Master选举,分布式计算器等现成的实现,并且这货现在还是apache的顶级项目,这个身价是ZkClient没法比的呢!!!!~~

zookeeper可以做啥

上一篇 下一篇

猜你喜欢

热点阅读