zookeeper(3.4.12)客户端curator使用

2018-10-26  本文已影响0人  全都是泡沫啦
package zk.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class TestCurator {
    public static final Logger logger = LoggerFactory.getLogger(TestCurator.class);
    public CuratorFramework client = null;

    @Before
    public void start() {
        RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
        client = CuratorFrameworkFactory
                .builder()
                .connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("test")
                .build();
        client.start();
    }

    /**
     * zookeeper恢复数据:从事物日志中进行恢复数据(事物日志类似于mysql的bin log)
     * 每次操作都会有一个事物id,连接 关闭 增删改 包括创建失败删除失败(对于查询和添加watch不增加事物id)
     */

    /**
     cZxid = 0x3         创建节点的事物id
     ctime = Fri Oct 2   创建节点的时间
     mZxid = 0x34        节点数据修改的事物id
     mtime = Fri Oct 2   节点数据修改的时间
     pZxid = 0x35        子节点删除或者增加的事物id(不包含子节点的数据修改)
     cversion = 8        子节点删除或者增加的版本  (不包含子节点的数据修改)
     dataVersion = 1     当前节点的数据修改的版本
     aclVersion = 0      权限控制
     ephemeralOwner =    临时节点的会话id
     dataLength = 3      当前节点数据的长度
     numChildren = 4     当前节点的子节点数
     */
    /**
     子节点删除增加:影响父节点pZxid cversion numChildren
     当前节点数据修改:影响当前节点 data mZxid mtime dataVersion dataLength
     节点创建:ctime cZxid
     权限:aclVersion
     临时节点:ephemeralOwner
     子节点的数据修改不影响父节点
     */


    /**
     * 只监控直属子节点的增删,直属子节点数据修改 (当删除该节点,在创建时无法继续监听)
     * /test/province不存在会自动创建
     * 启动时 /test/province子节点存在会触发
     *
     * @throws Exception
     */
    @Test
    public void childChange() throws Exception {
        final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/province", true);
        ListenerContainer<PathChildrenCacheListener> listenable = pathChildrenCache.getListenable();
        listenable.addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                logger.info("==============================================");
                logger.info("curatorFramework:" + curatorFramework);
                logger.info("pathChildrenCacheEvent:" + pathChildrenCacheEvent);
                List<ChildData> currentData = pathChildrenCache.getCurrentData();
                logger.info("currentData:" + currentData);
            }
        });
        pathChildrenCache.start();

        Thread.sleep(Integer.MAX_VALUE);
    }

    /**
     * 监听当前节点 删除(currentData == null) 修改数据 创建该节点(即使从父类开始新建) 启动存在会触发
     *
     * @throws Exception
     */
    @Test
    public void nodeChange() throws Exception {
        final NodeCache nodeCache = new NodeCache(client, "/province/shanxi");
        nodeCache.start();
        nodeCache.getListenable().addListener(new NodeCacheListener() {

            @Override
            public void nodeChanged() throws Exception {
                ChildData currentData = nodeCache.getCurrentData();
                logger.info("==============================================");
                logger.info("nodeChanged:" + currentData);
            }
        });
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Test
    public void treeChange() throws Exception {
        TreeCache treeCache = new TreeCache(client, "/");
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                logger.info("==============================================");
                logger.info("treeCacheEvent:" + treeCacheEvent);
            }
        });
        treeCache.start();
        Thread.sleep(Integer.MAX_VALUE);
    }


    @Test
    public void createContainers() throws Exception {
        client.createContainers("/container");
    }

    @Test
    public void createNode() throws Exception {
        client.create().forPath("/province");
    }

    @Test
    public void createPersistentNode() throws Exception {
        client.create().withMode(CreateMode.PERSISTENT).forPath("/shanxi");
        client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/hebei");
    }

    @Test
    public void createEphemeralNode() throws Exception {
        client.create().withMode(CreateMode.EPHEMERAL).forPath("/shanxi1");
        client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/hebei1");
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Test
    public void createChildNode() throws Exception {
        client.create().forPath("/province/shanxi");
    }

    @Test
    public void deleteNode() throws Exception {
        client.delete().deletingChildrenIfNeeded().forPath("/province");
    }

    @Test
    public void setData() throws Exception {
        client.setData().forPath("/province", "data".getBytes());
    }

    @Test
    public void createNodes() throws Exception {
        client.create().creatingParentsIfNeeded().forPath("/province/shanxi");
        client.create().creatingParentContainersIfNeeded().forPath("/province1/shanxi1");
    }

    @Test
    public void deleteParentNode() throws Exception {
        client.delete().deletingChildrenIfNeeded().forPath("/province");
    }

    @Test
    public void getDataStat() throws Exception {
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath("/");
        logger.info(stat.toString());
    }

    @Test
    public void getData() throws Exception {
        byte[] bytes = client.getData().forPath("/");
        logger.info(new String(bytes));
    }

    @Test
    public void setVersionData() throws Exception {
        Stat stat = client.setData().withVersion(3).forPath("/province", "data".getBytes());
        logger.info(stat.toString());
    }

    @After
    public void stop() {
        if (client != null) {
            client.close();
        }
    }
}



package zk.curator;

/**
 * Created by paul on 2018/10/27.
 */

import org.apache.zookeeper.client.FourLetterWordMain;
import org.apache.zookeeper.server.LogFormatter;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class TestZookeeperTools {
    Logger logger = LoggerFactory.getLogger(TestZookeeperTools.class);

    /**
     * 对zookeeper的事物日志进行查看
     * 18-10-26 下午06时27分03秒 session 0x10000182f6a0003 cxid 0x3 zxid 0xb setData '/test,#6e6968616f,1
     * session 会话id
     * cxid    客户端操作id
     * zxid    服务端全局事物id
     *
     * @throws Exception
     */
    @Test
    public void logExplain() throws Exception {
        LogFormatter.main(new String[]{"C:\\pengrun\\work\\software\\zookeeper-3.4.12\\bin\\data\\version-2\\log.1"});
    }

    /**
     * 对zookeeper的运行状态进行socket命令行查看
     *
     * @throws IOException
     */
    /*
    static {
        cmd2String.put(confCmd, "conf");打印出server的配置信息
        cmd2String.put(consCmd, "cons");打印出server上的所有连接/会话  ***************
        cmd2String.put(crstCmd, "crst");重置有关连接/会话的统计数据
        cmd2String.put(dumpCmd, "dump");这个命令只有发给ensemble中的爹有才效. 列出所有重要的会话和生命周期随会话的znode
        cmd2String.put(enviCmd, "envi");打印出server运行环境
        cmd2String.put(getTraceMaskCmd, "gtmk");获取当前的trace mask值, 以10进制64位有符号数值形式返回, 具体trace mask的含义下面会讲
        cmd2String.put(ruokCmd, "ruok"); are you ok, 如果回复 imok, 则说明这个server很健康. 如果server有问题, 则不会收到回复. 需要注意的是, 一个server回复了ruok不代表这个server在ensemble中的状态是正常的, 这仅代表server进程正常启动了. 要查看ensemble的概况需要用stat命令
        cmd2String.put(setTraceMaskCmd, "stmk");设置trace mask
        cmd2String.put(srstCmd, "srst");重置server上的所有统计数据
        cmd2String.put(srvrCmd, "srvr");列出server的全部信息
        cmd2String.put(statCmd, "stat");列出server的细节信息和与之相连的clients
        cmd2String.put(wchcCmd, "wchc");列出监控这个server的所有会话, 并列出每个会话监控的名称空间路径. 注意, 在会话较多的server上, 这个命令可能会相当耗时
        cmd2String.put(wchpCmd, "wchp");列出被监控的所有层级名称空间路径, 以及相关的会话. 注意同上, 这个命令也可能会相当耗时
        cmd2String.put(wchsCmd, "wchs");列出对该server的所有监控(watch)
        cmd2String.put(mntrCmd, "mntr");列出有关ensemble的一系列状态值. 通过这些状态值可以查看整个ensemble是不是正常
        cmd2String.put(isroCmd, "isro");检查server是否运行在只读状态, 回复ro代表server在只读状态, 回复rw代表server在可读可写状态
    }
     */
    @Test
    public void sendCmd() throws IOException {
        String commands = System.getProperty("zookeeper.4lw.commands.whitelist");
        //ServerCnxn.isEnabled(cmd)
        //System.getProperty(ZOOKEEPER_4LW_COMMANDS_WHITELIST);
        //private static final String ZOOKEEPER_4LW_COMMANDS_WHITELIST = "zookeeper.4lw.commands.whitelist";
        //String[] list = commands.split(",");
        //java启动设置 -Dzookeeper.4lw.commands.whitelist=*

        String results0 = FourLetterWordMain.send4LetterWord("127.0.0.1", 2181, "cons");//当前session链接详细信息
        String results1 = FourLetterWordMain.send4LetterWord("127.0.0.1", 2181, "dump");//session和临时节点
        logger.info("\n" + results0);
        System.out.println("=======================================");
        logger.info("\n" + results1);
        String results2 = FourLetterWordMain.send4LetterWord("127.0.0.1", 2181, "wchc");//session和它watch的路径
        System.out.println("=======================================");
        logger.info("\n" + results2);
    }
}


上一篇下一篇

猜你喜欢

热点阅读