zookeeper 客户端使用
2020-02-23 本文已影响0人
香沙小熊
1.原生
public class ZKWatch implements Watcher {
private static final String CONNECT_ADDR = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
private ZooKeeper zooKeeper;
public ZKWatch() {
try {
zooKeeper = new ZooKeeper(CONNECT_ADDR, 5000, this);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws KeeperException, InterruptedException {
ZKWatch zkWatch = new ZKWatch();
if(zkWatch.exists("/xionghu",true)!=null){
zkWatch.deleteRecursive("/xionghu");
}
zkWatch.addPZnode("/xionghu", "2020");
//zkWatch.addPZnode("/xionghu/aaa", "2019");
Thread.sleep(2000000);
}
public Stat exists(String path,boolean watch) throws KeeperException, InterruptedException {
return zooKeeper.exists(path, watch);
}
public void delete(String path) throws KeeperException, InterruptedException {
zooKeeper.delete(path, -1);
}
public void deleteRecursive(String path) throws KeeperException, InterruptedException {
ZKUtil.deleteRecursive(zooKeeper, path);
}
/**
* 创建znode结点
*
* @param path 结点路径
* @param data 结点数据
* @return true 创建结点成功 false表示结点存在
* @throws Exception
*/
public boolean addZnodeData(String path, String data, CreateMode mode) {
try {
if (zooKeeper.exists(path, true) == null) {
zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
return true;
}
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException("创建znode:" + path + "出现问题!!", e);
}
System.out.println("znode" + path + "结点已存在");
return false;
}
public boolean addZnodeData(String path, String data) {
try {
zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return true;
} catch (Exception e) {
//logger.error("【创建持久化节点异常】{},{},{}",path,data,e);
return false;
}
}
/**
* 创建永久znode结点
*
* @param path 结点路径
* @param data 结点数据
* @return true 创建结点成功 false表示结点存在
* @throws Exception
*/
public boolean addPZnode(String path, String data) {
return addZnodeData(path, data, CreateMode.PERSISTENT);
}
/**
* 创建临时znode结点
*
* @param path 结点路径
* @param data 结点数据
* @return true 创建结点成功 false表示结点存在
* @throws Exception
*/
public boolean addZEnode(String path, String data) {
return addZnodeData(path, data, CreateMode.EPHEMERAL);
}
/**
* 修改znode
*
* @param path 结点路径
* @param data 结点数据
* @return 修改结点成功 false表示结点不存在
*/
public boolean updateZnode(String path, String data) {
try {
Stat stat = null;
if ((stat = zooKeeper.exists(path, true)) != null) {
zooKeeper.setData(path, data.getBytes(), stat.getVersion());
return true;
}
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException("修改znode:" + path + "出现问题!!", e);
}
return false;
}
/**
* 删除结点
*
* @param path 结点
* @return true 删除键结点成功 false表示结点不存在
*/
public boolean deleteZnode(String path) {
try {
Stat stat = null;
if ((stat = zooKeeper.exists(path, true)) != null) {
List<String> subPaths = zooKeeper.getChildren(path, false);
if (subPaths.isEmpty()) {
zooKeeper.delete(path, stat.getVersion());
return true;
} else {
for (String subPath : subPaths) {
deleteZnode(path + "/" + subPath);
}
}
}
} catch (InterruptedException | KeeperException e) {
throw new RuntimeException("删除znode:" + path + "出现问题!!", e);
}
return false;
}
/**
* 取到结点数据
*
* @param path 结点路径
* @return null表示结点不存在 否则返回结点数据
*/
public String getZnodeData(String path) {
String data = null;
try {
Stat stat = null;
if ((stat = zooKeeper.exists(path, true)) != null) {
data = new String(zooKeeper.getData(path, true, stat));
} else {
System.out.println("znode:" + path + ",不存在");
}
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException("取到znode:" + path + "出现问题!!", e);
}
return data;
}
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("连接状态:"+watchedEvent.getState()+" "+ "时间类型 "+watchedEvent.getType()+
"受影响的path"+ watchedEvent.getPath());
}
}
2.ZkClient
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version></version>
</dependency>
public class ZkClientCrud<T> {
private static final String CONNECT_ADDR = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
private ZkClient zkClient;
public ZkClientCrud() {
this.zkClient = new ZkClient(CONNECT_ADDR, 5000, 5000, new SerializableSerializer());
}
/**
* 创建持久节点
*/
public void createPersistent(String path, Object data) {
zkClient.createPersistent(path, data);
}
public T readData(String path) {
return zkClient.readData(path);
}
//递归删除
public void deleteRecursive(String path) {
zkClient.deleteRecursive(path);
}
public void delete(String path) {
zkClient.delete(path);
}
/***
* 子节点
* @param path
* @return
*/
public List<String> getChildren(String path){
return zkClient.getChildren(path);
}
}
public class ZkClientWatcher {
private static final String CONNECT_ADDR = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
ZkClient zkClient;
public ZkClientWatcher() {
this.zkClient = new ZkClient(CONNECT_ADDR, 5000, 5000, new SerializableSerializer());
}
public void createPersistent(String path, Object data) {
zkClient.createPersistent(path, data);
}
public void writeData(String path, Object object) {
zkClient.writeData(path, object);
}
public void delete(String path) {
zkClient.delete(path);
}
public boolean exists(String path) {
return zkClient.exists(path);
}
public void deleteRecursive(String path) {
zkClient.deleteRecursive(path);
}
//对父节点添加监听数据变化。
public void subscribe(String path) {
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.printf("变更的节点为:%s,数据:%s\r\n", dataPath, data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.printf("删除的节点为:%s\r\n", dataPath);
}
});
}
//对父节点添加监听子节点变化。
public void subscribe2(String path) {
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("父节点: " + parentPath + ",子节点:" + currentChilds + "\r\n");
}
});
}
//客户端状态
public void subscribe3(String path) {
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
if (state == Watcher.Event.KeeperState.SyncConnected) {
//当我重新启动后start,监听触发
System.out.println("连接成功");
} else if (state == Watcher.Event.KeeperState.Disconnected) {
System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
} else {
System.out.println("其他状态" + state);
}
}
@Override
public void handleNewSession() throws Exception {
System.out.println("重建session");
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
}
});
3.Curator
import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.Map;
/**
* @date: 2020/2/23 15:25
* @author: xionghu
* @desc:
*/
public class CuratorUtil {
private CuratorFramework client;
public CuratorUtil(String zkAddress) {
client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
client.getCuratorListenable().addListener(new NodeEventListener());
client.start();
}
/**
* 创建node
*
* @param nodeName
* @param value
* @return
*/
public boolean createNode(String nodeName, String value) {
boolean suc = false;
try {
Stat stat = getClient().checkExists().forPath(nodeName);
if (stat == null) {
String opResult = null;
if (Strings.isNullOrEmpty(value)) {
opResult = getClient().create().creatingParentsIfNeeded().forPath(nodeName);
}
else {
opResult =
getClient().create().creatingParentsIfNeeded()
.forPath(nodeName, value.getBytes(Charsets.UTF_8));
}
suc = Objects.equal(nodeName, opResult);
}
}
catch (Exception e) {
e.printStackTrace();
}
return suc;
}
/**
* 更新节点
*
* @param nodeName
* @param value
* @return
*/
public boolean updateNode(String nodeName, String value) {
boolean suc = false;
try {
Stat stat = getClient().checkExists().forPath(nodeName);
if (stat != null) {
Stat opResult = getClient().setData().forPath(nodeName, value.getBytes(Charsets.UTF_8));
suc = opResult != null;
}
}
catch (Exception e) {
e.printStackTrace();
}
return suc;
}
/**
* 删除节点
*
* @param nodeName
*/
public void deleteNode(String nodeName) {
try {
getClient().delete().deletingChildrenIfNeeded().forPath(nodeName);
}
catch (Exception e) {
e.printStackTrace();
}
}
/**
* 找到指定节点下所有子节点的名称与值
*
* @param node
* @return
*/
public Map<String, String> listChildrenDetail(String node) {
Map<String, String> map = Maps.newHashMap();
try {
GetChildrenBuilder childrenBuilder = getClient().getChildren();
List<String> children = childrenBuilder.forPath(node);
GetDataBuilder dataBuilder = getClient().getData();
if (children != null) {
for (String child : children) {
String propPath = ZKPaths.makePath(node, child);
map.put(child, new String(dataBuilder.forPath(propPath), Charsets.UTF_8));
}
}
}
catch (Exception e) {
e.printStackTrace();
}
return map;
}
/**
* 列出子节点的名称
*
* @param node
* @return
*/
public List<String> listChildren(String node) {
List<String> children = Lists.newArrayList();
try {
GetChildrenBuilder childrenBuilder = getClient().getChildren();
children = childrenBuilder.forPath(node);
}
catch (Exception e) {
e.printStackTrace();
}
return children;
}
/**
* 增加监听
*
* @param node
* @param isSelf
* true 为node本身增加监听 false 为node的子节点增加监听
* @throws Exception
*/
public void addWatch(String node, boolean isSelf) throws Exception {
if (isSelf) {
getClient().getData().watched().forPath(node);
}
else {
getClient().getChildren().watched().forPath(node);
}
}
/**
* 增加监听
*
* @param node
* @param isSelf
* true 为node本身增加监听 false 为node的子节点增加监听
* @param watcher
* @throws Exception
*/
public void addWatch(String node, boolean isSelf, Watcher watcher) throws Exception {
if (isSelf) {
getClient().getData().usingWatcher(watcher).forPath(node);
}
else {
getClient().getChildren().usingWatcher(watcher).forPath(node);
}
}
/**
* 增加监听
*
* @param node
* @param isSelf
* true 为node本身增加监听 false 为node的子节点增加监听
* @param watcher
* @throws Exception
*/
public void addWatch(String node, boolean isSelf, CuratorWatcher watcher) throws Exception {
if (isSelf) {
getClient().getData().usingWatcher(watcher).forPath(node);
}
else {
getClient().getChildren().usingWatcher(watcher).forPath(node);
}
}
/**
* 销毁资源
*/
public void destory() {
if (client != null) {
client.close();
}
}
/**
* 获取client
*
* @return
*/
public CuratorFramework getClient() {
return client;
}
}
final class NodeEventListener implements CuratorListener {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println(event.toString() + ".......................");
final WatchedEvent watchedEvent = event.getWatchedEvent();
if (watchedEvent != null) {
System.out.println(watchedEvent.getState() + "=======================" + watchedEvent.getType());
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
switch (watchedEvent.getType()) {
case NodeChildrenChanged:
break;
case NodeDataChanged:
// TODO
break;
default:
break;
}
}
}
}
}
public class CuratorTest {
private static final String CONNECT_ADDR = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
public static void main(String[] args) throws Exception {
CuratorUtil curator = new CuratorUtil(CONNECT_ADDR);
curator.createNode("/root/test1", "abc1");
curator.createNode("/root/test2", "abc2");
curator.updateNode("/root/test2", "abc3");
List<String> list = curator.listChildren("/root");
Map<String, String> map = curator.listChildrenDetail("/root");
// curator.deleteNode("/zkroot");
// curator.destory();
System.out.println("=========================================");
for (String str : list) {
System.out.println(str);
}
System.out.println("=========================================");
for (Map.Entry<String, String> entry : map.entrySet()) {
System.out.println(entry.getKey() + "=>" + entry.getValue());
}
// 增加监听
curator.addWatch("/root", false);
TimeUnit.SECONDS.sleep(600);
}
}