Java客户端Curator
2019-10-08 本文已影响0人
Audience0
Curator是Netflix公司开源的一个Zookeeper客户端,现在是apache下的一个开源项目,与Zookeeper提供的原生客户端相比,Curator的进行了高度抽象和封装,简化了Zookeeper客户端的开发。
Netflix:https://github.com/Netflix/curator
maven依赖
<!--https://mvnrepository.com/artifact/org.apache.curator/curator-recipes/4.2.0-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
<!--https://mvnrepository.com/artifact/org.apache.curator/curator-framework/4.2.0-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
zookeeper 基本操作代碼如下:
public CuratorClient() {
//重试策略,重试3次,间隔2S
RetryPolicy retryPolicy = new RetryNTimes(3,2000);
//创建zookeeper连接
client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, retryPolicy);
//开启客户端
client.start();
}
public static void main(String[] args) throws Exception {
CuratorClient curatorClient = new CuratorClient();
System.out.println("zookeeper连接成功client:" + curatorClient.client);
//查看连接状态 LATENT 未start, STARTED start了
System.out.println(curatorClient.client.getState().name());
//创建节点 curator流式的编码方式 creatingParentsIfNeeded如果需要把父节点也创建
/* String node = curatorClient.client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(ROOT_NODE + "/child","child".getBytes());
*/
// System.out.println(node);
//更新节点
// Stat stat = curatorClient.client.setData().withVersion(-1).forPath(ROOT_NODE, "curator".getBytes());
// System.out.println(stat);
//读取数据 将数据存在stat里
// Stat dataStat = new Stat();
// byte[] bytes = curatorClient.client.getData().storingStatIn(dataStat).forPath(ROOT_NODE);
// System.out.println(new String(bytes));
// System.out.println(dataStat);
//curatorClient.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(ROOT_NODE + "/child1","child1".getBytes());
//查询节点
//List<String> childStrList = curatorClient.client.getChildren().forPath(ROOT_NODE);
/**
* forEach方法
*
* default void forEach(Consumer<? super T> action) {
* Objects.requireNonNull(action);
* for (T t : this) {
* action.accept(t);
* }
* }
*
*
* Consumer对象 函数式接口
* 如何表达函数式接口
* 1.匿名内部类
* childStrList.forEach(new Consumer<String>() {
* @Override
* public void accept(String s) {
* System.out.println(s);
* }
* });
* 2.lambda表达式
* childStrList.forEach((String child) -> {
* System.out.println(child);
* });
*
* 3.方法引用: System.out::println 与 void accept(String s)签名一样就可以
* childStrList.forEach(System.out::println);
*/
/*childStrList.forEach((String child) -> {
System.out.println(child);
});*/
// childStrList.forEach(System.out::println);
//方法引用方式,引用自己的打印
// childStrList.forEach(CuratorClient::print);
//判断节点是否存在,存在则返回状态,不存在返回null
// Stat existStat = curatorClient.client.checkExists().forPath(ROOT_NODE);
//System.out.println(existStat);
//判断节点是否存在,如果父节点不存在则创建父节点路径,子节点存在则返回状态,不存在返回null
//Stat existStat2 = curatorClient.client.checkExists().creatingParentsIfNeeded().forPath(ROOT_NODE + "2" + "/child2");
//System.out.println("节点存在:" + existStat2);
//删除节点 节点存在子节点, 删除失败,
//curatorClient.client.delete().forPath(ROOT_NODE);
//删除节点, 能删除掉带有子节点的节点
//curatorClient.client.delete().deletingChildrenIfNeeded().forPath(ROOT_NODE);
//删除节点 guaranteed() 这个方法确保删除,只要zookeeper接到这个指令,即使客户端与zookeeper断链,zookeeper也会删除节点
//curatorClient.client.delete().guaranteed().deletingChildrenIfNeeded().forPath(ROOT_NODE);
//zookeeper节点监听(watcher) 在获取数据的时候对该节点增加监听watcher,但是只能监听一次
//他监听的节点必须存在
/*curatorClient.client.getData().usingWatcher((CuratorWatcher)(WatchedEvent event) ->{
System.out.println("对节点[" + event.getPath() +"] 进行了操作,操作类型:" + event.getType().name());
}).forPath(ROOT_NODE);*/
//zookeeper节点监听 程序不退出时的永久监听,true 表示数据压缩,节省空间
//他监听的节点可以不存在
//NodeCache noderCache = new NodeCache(curatorClient.client,"/curator15" ,false);
//默认是不获取数据
//noderCache.start();
//true 表示把节点的数据获取出来
//noderCache.start(true);
//如果true可以这么获取到数据
//creteSystem.out.println("nodeCache data:" + new String(noderCache.getCurrentData().getData()));
/*noderCache.getListenable().addListener(() -> {
System.out.println("nodeCache 监听事件....");
System.out.println("节点数据:" + new String(noderCache.getCurrentData().getData()));
System.out.println("节点:" + noderCache.getPath());
});*/
//监听子节点 true表示缓存数据
//如果/root不存在,会被创建出来
PathChildrenCache childrenCache = new PathChildrenCache(curatorClient.client,"/root",true);
//启动监听
childrenCache.start(PathChildrenCache.StartMode.NORMAL);
//监听器PathChildrenCacheListener
//只能监听一层子节点
childrenCache.getListenable().addListener(
(CuratorFramework client, PathChildrenCacheEvent event) -> {
System.out.println("子节点监听事件了...");
System.out.println("TYPE:" + event.getType().name());
System.out.println(new String(event.getData().getData()));
event.getInitialData().forEach((ChildData data) -> {
System.out.println("初始化数据"+data.getPath());
});
});
Thread.sleep(1000000);
}
public static void print(String string){
System.out.println("curator:" + string);
}
}
curator支持三种cache: pathChildrenCache,NodeCache,TreeCache
1.NodeCache 对于指定节点监听,监听器注册接口为NodeCacheListener;
2.pathChildrenCache对于指定节点的子节点(第一级)进行监听,监听器注册接口PathChildrenCacheListener.
3.TreeCache 对于所有节点监听,监听器注册接口为TreeCacheListener
TreeCache 对指定节点及其所有子节点进行监听
监听代码如下:
public class CuratorCilent1 {
private CuratorFramework client = null;
private static final String ZK_ADRESS = "127.0.0.1";
private static final String ROOT = "/tree";
public CuratorCilent1() {
RetryPolicy retryPolicy = new RetryNTimes(2,1000);
client = CuratorFrameworkFactory.newClient(ZK_ADRESS,retryPolicy);
client.start();
}
public static void main(String[] args) throws Exception {
CuratorCilent1 curatorCilent = new CuratorCilent1();
System.out.println("连接成功..clien:" + curatorCilent.client);
//对于指定节点下的所有节点进行监听
TreeCache treeCache = new TreeCache(curatorCilent.client,ROOT);
treeCache.start();
//添加监听事件
treeCache.getListenable().addListener((CuratorFramework client, TreeCacheEvent event) -> {
System.out.println("Tree事件监听到了....");
System.out.println(event.getType().name());
if (event.getData() != null){
System.out.println("监听的路径:"+event.getData().getPath());
System.out.println("监听的数据:"+new String(event.getData().getData()));
System.out.println("监听的状态:"+event.getData().getStat());
}
});
Thread.sleep(10000000);
}
}