zookeeper 客户端使用

2020-02-23  本文已影响0人  香沙小熊

1.原生

public class ZKWatch implements Watcher {
    private static final String CONNECT_ADDR = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

    private ZooKeeper zooKeeper;

    public ZKWatch() {
        try {
            zooKeeper = new ZooKeeper(CONNECT_ADDR, 5000, this);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws KeeperException, InterruptedException {
        ZKWatch zkWatch = new ZKWatch();

        if(zkWatch.exists("/xionghu",true)!=null){
            zkWatch.deleteRecursive("/xionghu");
        }
        zkWatch.addPZnode("/xionghu", "2020");
        //zkWatch.addPZnode("/xionghu/aaa", "2019");



        Thread.sleep(2000000);

    }

    public Stat exists(String path,boolean watch) throws KeeperException, InterruptedException {
        return zooKeeper.exists(path, watch);

    }

    public void delete(String path) throws KeeperException, InterruptedException {
        zooKeeper.delete(path, -1);
    }

    public void deleteRecursive(String path) throws KeeperException, InterruptedException {
        ZKUtil.deleteRecursive(zooKeeper, path);
    }

    /**
     * 创建znode结点
     *
     * @param path 结点路径
     * @param data 结点数据
     * @return true 创建结点成功 false表示结点存在
     * @throws Exception
     */
    public boolean addZnodeData(String path, String data, CreateMode mode) {
        try {
            if (zooKeeper.exists(path, true) == null) {
                zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
                return true;
            }
        } catch (KeeperException | InterruptedException e) {
            throw new RuntimeException("创建znode:" + path + "出现问题!!", e);
        }
        System.out.println("znode" + path + "结点已存在");
        return false;
    }

    public boolean addZnodeData(String path, String data) {
        try {
            zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            return true;
        } catch (Exception e) {
            //logger.error("【创建持久化节点异常】{},{},{}",path,data,e);
            return false;
        }
    }

    /**
     * 创建永久znode结点
     *
     * @param path 结点路径
     * @param data 结点数据
     * @return true 创建结点成功 false表示结点存在
     * @throws Exception
     */
    public boolean addPZnode(String path, String data) {
        return addZnodeData(path, data, CreateMode.PERSISTENT);
    }

    /**
     * 创建临时znode结点
     *
     * @param path 结点路径
     * @param data 结点数据
     * @return true 创建结点成功 false表示结点存在
     * @throws Exception
     */
    public boolean addZEnode(String path, String data) {
        return addZnodeData(path, data, CreateMode.EPHEMERAL);
    }

    /**
     * 修改znode
     *
     * @param path 结点路径
     * @param data 结点数据
     * @return 修改结点成功   false表示结点不存在
     */
    public boolean updateZnode(String path, String data) {
        try {
            Stat stat = null;
            if ((stat = zooKeeper.exists(path, true)) != null) {
                zooKeeper.setData(path, data.getBytes(), stat.getVersion());
                return true;
            }
        } catch (KeeperException | InterruptedException e) {
            throw new RuntimeException("修改znode:" + path + "出现问题!!", e);
        }
        return false;
    }

    /**
     * 删除结点
     *
     * @param path 结点
     * @return true 删除键结点成功  false表示结点不存在
     */
    public boolean deleteZnode(String path) {
        try {
            Stat stat = null;
            if ((stat = zooKeeper.exists(path, true)) != null) {
                List<String> subPaths = zooKeeper.getChildren(path, false);
                if (subPaths.isEmpty()) {
                    zooKeeper.delete(path, stat.getVersion());
                    return true;
                } else {
                    for (String subPath : subPaths) {
                        deleteZnode(path + "/" + subPath);
                    }
                }
            }
        } catch (InterruptedException | KeeperException e) {
            throw new RuntimeException("删除znode:" + path + "出现问题!!", e);
        }
        return false;
    }

    /**
     * 取到结点数据
     *
     * @param path 结点路径
     * @return null表示结点不存在 否则返回结点数据
     */
    public String getZnodeData(String path) {
        String data = null;
        try {
            Stat stat = null;
            if ((stat = zooKeeper.exists(path, true)) != null) {
                data = new String(zooKeeper.getData(path, true, stat));
            } else {
                System.out.println("znode:" + path + ",不存在");
            }
        } catch (KeeperException | InterruptedException e) {
            throw new RuntimeException("取到znode:" + path + "出现问题!!", e);
        }
        return data;
    }



    @Override
    public void process(WatchedEvent watchedEvent) {

        System.out.println("连接状态:"+watchedEvent.getState()+"   "+ "时间类型 "+watchedEvent.getType()+
           "受影响的path"+ watchedEvent.getPath());
    }

}

2.ZkClient

<dependency>
  <groupId>com.101tec</groupId>
  <artifactId>zkclient</artifactId>
  <version></version>
</dependency>
public class ZkClientCrud<T> {
    private static final String CONNECT_ADDR = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

    private ZkClient zkClient;

    public ZkClientCrud() {
        this.zkClient = new ZkClient(CONNECT_ADDR, 5000, 5000, new SerializableSerializer());
    }

    /**
     * 创建持久节点
     */
    public void createPersistent(String path, Object data) {
        zkClient.createPersistent(path, data);
    }

    public T readData(String path) {
        return zkClient.readData(path);
    }

    //递归删除
    public void deleteRecursive(String path) {
        zkClient.deleteRecursive(path);

    }

    public void delete(String path) {
        zkClient.delete(path);

    }

    /***
     * 子节点
     * @param path
     * @return
     */
    public List<String> getChildren(String path){
        return zkClient.getChildren(path);

    }
}
public class ZkClientWatcher {
    private static final String CONNECT_ADDR = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

    ZkClient zkClient;

    public ZkClientWatcher() {
        this.zkClient = new ZkClient(CONNECT_ADDR, 5000, 5000, new SerializableSerializer());
    }


    public void createPersistent(String path, Object data) {
        zkClient.createPersistent(path, data);
    }


    public void writeData(String path, Object object) {
        zkClient.writeData(path, object);

    }

    public void delete(String path) {
        zkClient.delete(path);

    }

    public boolean exists(String path) {
        return zkClient.exists(path);

    }

    public void deleteRecursive(String path) {
        zkClient.deleteRecursive(path);

    }

    //对父节点添加监听数据变化。
    public void subscribe(String path) {


        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.printf("变更的节点为:%s,数据:%s\r\n", dataPath, data);
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.printf("删除的节点为:%s\r\n", dataPath);
            }
        });
    }

    //对父节点添加监听子节点变化。
    public void subscribe2(String path) {
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("父节点: " + parentPath + ",子节点:" + currentChilds + "\r\n");
            }
        });
    }


    //客户端状态
    public void subscribe3(String path) {
        zkClient.subscribeStateChanges(new IZkStateListener() {
            @Override
            public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
                if (state == Watcher.Event.KeeperState.SyncConnected) {
                    //当我重新启动后start,监听触发
                    System.out.println("连接成功");
                } else if (state == Watcher.Event.KeeperState.Disconnected) {
                    System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
                } else {
                    System.out.println("其他状态" + state);
                }
            }

            @Override
            public void handleNewSession() throws Exception {
                System.out.println("重建session");

            }

            @Override
            public void handleSessionEstablishmentError(Throwable error) throws Exception {

            }
        });

3.Curator

import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;

import java.util.List;
import java.util.Map;

/**
 * @date: 2020/2/23 15:25
 * @author: xionghu
 * @desc:
 */
public  class CuratorUtil {
    private CuratorFramework client;


    public CuratorUtil(String zkAddress) {
        client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
        client.getCuratorListenable().addListener(new NodeEventListener());
        client.start();
    }


    /**
     * 创建node
     *
     * @param nodeName
     * @param value
     * @return
     */
    public boolean createNode(String nodeName, String value) {
        boolean suc = false;
        try {
            Stat stat = getClient().checkExists().forPath(nodeName);
            if (stat == null) {
                String opResult = null;
                if (Strings.isNullOrEmpty(value)) {
                    opResult = getClient().create().creatingParentsIfNeeded().forPath(nodeName);
                }
                else {
                    opResult =
                            getClient().create().creatingParentsIfNeeded()
                                    .forPath(nodeName, value.getBytes(Charsets.UTF_8));
                }
                suc = Objects.equal(nodeName, opResult);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return suc;
    }


    /**
     * 更新节点
     *
     * @param nodeName
     * @param value
     * @return
     */
    public boolean updateNode(String nodeName, String value) {
        boolean suc = false;
        try {
            Stat stat = getClient().checkExists().forPath(nodeName);
            if (stat != null) {
                Stat opResult = getClient().setData().forPath(nodeName, value.getBytes(Charsets.UTF_8));
                suc = opResult != null;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return suc;
    }


    /**
     * 删除节点
     *
     * @param nodeName
     */
    public void deleteNode(String nodeName) {
        try {
            getClient().delete().deletingChildrenIfNeeded().forPath(nodeName);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 找到指定节点下所有子节点的名称与值
     *
     * @param node
     * @return
     */
    public Map<String, String> listChildrenDetail(String node) {
        Map<String, String> map = Maps.newHashMap();
        try {
            GetChildrenBuilder childrenBuilder = getClient().getChildren();
            List<String> children = childrenBuilder.forPath(node);
            GetDataBuilder dataBuilder = getClient().getData();
            if (children != null) {
                for (String child : children) {
                    String propPath = ZKPaths.makePath(node, child);
                    map.put(child, new String(dataBuilder.forPath(propPath), Charsets.UTF_8));
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return map;
    }


    /**
     * 列出子节点的名称
     *
     * @param node
     * @return
     */
    public List<String> listChildren(String node) {
        List<String> children = Lists.newArrayList();
        try {
            GetChildrenBuilder childrenBuilder = getClient().getChildren();
            children = childrenBuilder.forPath(node);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return children;
    }


    /**
     * 增加监听
     *
     * @param node
     * @param isSelf
     *            true 为node本身增加监听 false 为node的子节点增加监听
     * @throws Exception
     */
    public void addWatch(String node, boolean isSelf) throws Exception {
        if (isSelf) {
            getClient().getData().watched().forPath(node);
        }
        else {
            getClient().getChildren().watched().forPath(node);
        }
    }


    /**
     * 增加监听
     *
     * @param node
     * @param isSelf
     *            true 为node本身增加监听 false 为node的子节点增加监听
     * @param watcher
     * @throws Exception
     */
    public void addWatch(String node, boolean isSelf, Watcher watcher) throws Exception {
        if (isSelf) {
            getClient().getData().usingWatcher(watcher).forPath(node);
        }
        else {
            getClient().getChildren().usingWatcher(watcher).forPath(node);
        }
    }


    /**
     * 增加监听
     *
     * @param node
     * @param isSelf
     *            true 为node本身增加监听 false 为node的子节点增加监听
     * @param watcher
     * @throws Exception
     */
    public void addWatch(String node, boolean isSelf, CuratorWatcher watcher) throws Exception {
        if (isSelf) {
            getClient().getData().usingWatcher(watcher).forPath(node);
        }
        else {
            getClient().getChildren().usingWatcher(watcher).forPath(node);
        }
    }


    /**
     * 销毁资源
     */
    public void destory() {
        if (client != null) {
            client.close();
        }
    }


    /**
     * 获取client
     *
     * @return
     */
    public CuratorFramework getClient() {
        return client;
    }

}
final class NodeEventListener implements CuratorListener {
    @Override
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
        System.out.println(event.toString() + ".......................");
        final WatchedEvent watchedEvent = event.getWatchedEvent();
        if (watchedEvent != null) {
            System.out.println(watchedEvent.getState() + "=======================" + watchedEvent.getType());
            if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                switch (watchedEvent.getType()) {
                    case NodeChildrenChanged:

                        break;
                    case NodeDataChanged:
                        // TODO
                        break;
                    default:
                        break;
                }
            }
        }
    }
}
public class CuratorTest {
    private static final String CONNECT_ADDR = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";



    public static void main(String[] args) throws Exception {
        CuratorUtil curator = new CuratorUtil(CONNECT_ADDR);
        curator.createNode("/root/test1", "abc1");
        curator.createNode("/root/test2", "abc2");
        curator.updateNode("/root/test2", "abc3");
        List<String> list = curator.listChildren("/root");
        Map<String, String> map = curator.listChildrenDetail("/root");
        // curator.deleteNode("/zkroot");
        // curator.destory();
        System.out.println("=========================================");
        for (String str : list) {
            System.out.println(str);
        }

        System.out.println("=========================================");
        for (Map.Entry<String, String> entry : map.entrySet()) {
            System.out.println(entry.getKey() + "=>" + entry.getValue());
        }

        // 增加监听
        curator.addWatch("/root", false);

        TimeUnit.SECONDS.sleep(600);
    }

}

上一篇 下一篇

猜你喜欢

热点阅读