(9)zookeeper的客户端api的基本操作

2018-11-26  本文已影响0人  Mrsunup

zookeeper的开源客户端有基于原生的zookeeper的客户端还有zkclient还有curator,下面就针对原生的api和curator的api来实现zookeeper的基本节点的操作

1.基于zookeeper原生的客户端展示

    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.8</version>
    </dependency>
/**
 * @Project: 3.DistributedProject
 * @description: 用原生zookeeper客户端的api
 * @author: sunkang
 * @create: 2018-06-23 13:04
 * @ModificationHistory who      when       What
 **/
public class OrginZookeeperConnectionDemo {

    public static void main(String[] args) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Watcher watcher =  new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if( watchedEvent.getState() == Event.KeeperState.SyncConnected){
                    //连接成功会有SyncConnected事件产生
                    System.out.println("默认事件"+watchedEvent.getPath()+"->"+watchedEvent.getState()+"->"+watchedEvent.getType());
                    //如果收到了服务端的响应事件,连接成功,接下来才可以对zookeeper的数据节点进行操作
                    countDownLatch.countDown();
                }
            }};

        final ZooKeeper zooKeeper = new ZooKeeper("192.168.44.129:2181", 4000,watcher );

        countDownLatch.await();

        zooKeeper.exists("/zk-test-create", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println(watchedEvent.getPath()+"->"+watchedEvent.getState()+"->"+watchedEvent.getType());
                //再次绑定
                try {
                    //这里会触发默认的全局事件
                    zooKeeper.exists(watchedEvent.getPath(),true);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        });

        //1.创建临时节点
        zooKeeper.create("/zk-test-create","0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
        Thread.sleep(1000);
        System.out.println("添加节点成功");


        Stat state = new Stat();
        //2.得到当前节点的值
        byte[]  bytes =  zooKeeper.getData("/zk-test-create",null,state);
        System.out.println("createNode的当前的值为:"+ new String(bytes));

        //3.修改当前节点的值
        zooKeeper.setData("/zk-test-create","2".getBytes(),state.getVersion());
        //得到当前节点的值
        byte[]  byte1s =  zooKeeper.getData("/zk-test-create",null,state);
        System.out.println("createNode的修改后值为 : "+ new String(byte1s));


        //4.查看子节点
        List<String> childrenList =  zooKeeper.getChildren("/",false);
        System.out.println("childrenList: "+childrenList);

        //5.删除节点的值
        zooKeeper.delete("/zk-test-create",state.getVersion());


        //6.设置权限认证
        zooKeeper.addAuthInfo("digest","foo:true".getBytes());
        zooKeeper.create("/zk-book-auth_test","init".getBytes(),ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.EPHEMERAL);

        //7.新建客户端需要认证获取对应的节点
       byte[] authBytes =  zooKeeper.getData("/zk-book-auth_test",false,null);
        System.out.println("/zk-book-auth_test的value:"+ new String(authBytes) );

        zooKeeper.close();
//        System.in.read();
    }
}
默认事件null->SyncConnected->None
/zk-test-create->SyncConnected->NodeCreated
添加节点成功
createNode的当前的值为:0
默认事件/zk-test-create->SyncConnected->NodeDataChanged
createNode的修改后值为 : 2
childrenList: [curator_recipes_distatomicint_path, zookeeper, zk-test-create, curator_recipes_master_path, curator_recipes_lock_path, locks, c1, curator_recipes_barrier_path]
/zk-book-auth_test的value:init

2..基于curator的客户端展示

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>4.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.0.0</version>
    </dependency>
  </dependencies>
/**
 * @Project: 3.DistributedProject
 * @description: 使用curator来操作节点
 * @author: sunkang
 * @create: 2018-06-23 14:25
 * @ModificationHistory who      when       What
 **/
public class CuratorDemo {
    public static void main(String[] args) throws Exception {
        CuratorFramework curator =CuratorFrameworkFactory.builder()
                .connectString("192.168.44.129:2181")
                .connectionTimeoutMs(4000)//连接超时时间设置4秒中
                .sessionTimeoutMs(4000)//session超时设置4秒中
                .retryPolicy(new ExponentialBackoffRetry(1000,3))//设置连接的重试机制
                .namespace("curator")//设置命名空间,表明接下来的节点操作都在/curator的下进行操作
                .build();

        //启动
        curator.start();

        //1.创建节点  creatingParentsIfNeeded如果子节点的父级节点不存在,会联级创建
        curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/sunkang/test","sunkang".getBytes());
        System.out.println("创建节点成功");

        //2.获取节点的状态
        Stat state =new Stat();
        System.out.println("/sunkang/test的值: "+ new String(curator.getData().storingStatIn(state).forPath("/sunkang/test")));

        //3.设置改变节点
        curator.setData().withVersion(state.getAversion()).forPath("/sunkang/test","xx".getBytes());

        //4.获取子节点
        List<String> childrens = curator.getChildren().forPath("/sunkang");
        System.out.println("childrens : "+childrens);

        //5.检查是否存在
        Stat stat =  curator.checkExists().forPath("/sunkang");
        System.out.println("state: "+ stat);

        //6.使用watcher
        curator.getChildren().usingWatcher(new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event.getPath()+"->"+event.getState()+"->"+event.getType());
            }
        }).forPath("/sunkang");
        //7.删除节点,deletingChildrenIfNeeded表示级联删除
        curator.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath("/sunkang");

        System.in.read();
    }
}
创建节点成功
/sunkang/test的值: sunkang
childrens : [test]
state: 505,505,1542703973660,1542703973660,0,1,0,0,0,1,506

/sunkang->SyncConnected->NodeChildrenChanged

3..基于curator的客户端监听的展示

主要利用了NodeCache和PathChildrenCache以及和TreeCache来实现监听。
代码如下 :

/**
 * @Project: 3.DistributedProject
 * @description: curator实现监听
 * @author: sunkang
 * @create: 2018-06-23 14:45
 * @ModificationHistory who      when       What
 **/
public class CuratorWatcherDemo {

    public static void main(String[] args) throws Exception {
        CuratorFramework curator =CuratorFrameworkFactory.builder()
                .connectString("192.168.44.129:2181")
                .connectionTimeoutMs(4000)
                .sessionTimeoutMs(4000)
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .namespace("curator")
                .build();

        curator.start();
        //当前节点的监听
//        addListenerWhitNodeCash(curator,"/sunkang");
        //监听子节点的监听
//        addListenerWhitPathChildCash(curator,"/sunkang");
        //综合性事件
        addListenerWithTreeCache(curator,"/sunkang");
        System.in.read();

    }

    /**
     * 即节点的监听又监听子节点的监听
     * @param curator
     * @param s
     * @throws Exception
     */
    private static void addListenerWithTreeCache(CuratorFramework curator, String s) throws Exception {
       final TreeCache treeCache = new TreeCache(curator,s);
       TreeCacheListener listener = new TreeCacheListener() {
           @Override
           public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
               System.out.println("zonghe "+treeCacheEvent.getData()+";"+treeCacheEvent.getType());
           }
       };
       treeCache.getListenable().addListener(listener);
       treeCache.start();
    }

    /**
     *对给具体的节点的子节点的增加监听,子节点的删除,创建和数据节点的内容发生变化,会触发监听事件
     * @param curator
     * @param s
     * @throws Exception
     */
    private static void addListenerWhitPathChildCash(final CuratorFramework curator, String s) throws Exception {
        final PathChildrenCache pathChildrenCache =new PathChildrenCache(curator,s,true);

        PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("CHILD_ADDED," + event.getData().getPath());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("CHILD_UPDATED," + event.getData().getPath());
                        break;
                    case CHILD_REMOVED:
                        System.out.println("CHILD_REMOVED," + event.getData().getPath());
                        break;
                    default:
                        break;
                }
            }
        };
        pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
        pathChildrenCache.start();

    }

    /**
     * 给具体的节点的增加监听,创建,删除,数据值改变
     * @param curator
     * @param s
     * @throws Exception
     */
    private static void addListenerWhitNodeCash(CuratorFramework curator, String s) throws Exception {
        final NodeCache nodeCache = new NodeCache(curator,s);
        NodeCacheListener nodeCacheListener = new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("Node data update, new data: " + new String(nodeCache.getCurrentData().getData()));
            }
        };
        nodeCache.getListenable().addListener(nodeCacheListener);
        nodeCache.start();
    }
}

更多的操作内容可以参考:https://github.com/sunkang123/zookeeper

上一篇下一篇

猜你喜欢

热点阅读