大数据学习

Zookeeper基础,客户端使用,Java / Scala A

2020-09-09  本文已影响0人  xiaogp

zookeeper基础


Znode节点类型
有四种类型的znode:

zookeeper客户端操作

(1).使用zookeeper-client连接zookeeper
不指定-server默认localhost:2181

zookeeper-client -server chd04:2181

(2). help命令

zookeeper-client help
ZooKeeper -server host:port cmd args
    stat path [watch]
    set path data [version]
    ls path [watch]
    delquota [-n|-b] path
    ls2 path [watch]
    setAcl path acl
    setquota -n|-b val path
    history 

(3). get命令
get命令用于获取节点的数据,注意节点的路径必须是以/开头的路径。

get /kafka/config/topics/test_gp
[zk: localhost:2181(CONNECTED) 6] get /kafka/config/topics/test_gp
{"version":1,"config":{}}
cZxid = 0x35006dcb7f
ctime = Thu Jul 02 18:31:37 CST 2020
mZxid = 0x35006dcb7f
mtime = Thu Jul 02 18:31:37 CST 2020
pZxid = 0x35006dcb7f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 25
numChildren = 0

数据是{"version":1,"config":{}},其他参数说明
cZxid:节点创建时的zxid
ctime:节点创建时间
mZxid:节点最近一次更新时的zxid
mtime:节点最近一次更新的时间
cversion:子节点数据更新次数
dataVersion:本节点数据更新次数
aclVersion:节点ACL(授权信息)的更新次数
ephemeralOwner:如果该节点为临时节点,ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是临时节点,ephemeralOwner值为0
dataLength:节点数据长度是25
numChildren:子节点个数是0

(4). stat命令
该参数的输出等同于的get的额外信息输出。

[zk: localhost:2181(CONNECTED) 7] stat /kafka/config/topics/test_gp
cZxid = 0x35006dcb7f
ctime = Thu Jul 02 18:31:37 CST 2020
mZxid = 0x35006dcb7f
mtime = Thu Jul 02 18:31:37 CST 2020
pZxid = 0x35006dcb7f
cversion = 0

(5). create命令
create创建节点

[zk: localhost:2181(CONNECTED) 10] create /test some-value
Created /test

子节点只能在父节点上依次创建,现在test下创建a存储null,再在a下创建value存储false

[zk: localhost:2181(CONNECTED) 107] create /test/a null
Created /test/a
[zk: localhost:2181(CONNECTED) 108] create /test/a/value false
Created /test/a/value

(6). set命令
set命令用于设置和修改节点的数据

[zk: localhost:2181(CONNECTED) 16] set /test other-value
cZxid = 0x3b0011a50d
ctime = Wed Sep 09 17:17:30 CST 2020
mZxid = 0x3b0011a548
mtime = Wed Sep 09 17:19:49 CST 2020
pZxid = 0x3b0011a50d
cversion = 0

(7). ls命令
ls命令用于获取路径下的节点信息,路径为绝对路径。

[zk: localhost:2181(CONNECTED) 23] ls /kafka/config
[changes, clients, topics]
ls2命令

ls2命令比ls命令多输出本节点信息

[zk: localhost:2181(CONNECTED) 24] ls2 /kafka/config
[changes, clients, topics]
cZxid = 0x20000014c
ctime = Thu Mar 14 14:40:56 CST 2019
mZxid = 0x20000014c
mtime = Thu Mar 14 14:40:56 CST 2019

(8). history命令
history用于列出最近的命令历史。

[zk: localhost:2181(CONNECTED) 25] history
15 - ls /test
16 - set /test other-value
17 - get /test
18 - ls
19 - ls /
20 - ls /kafka

(9). delete命令
delete命令用于删除节点。

[zk: localhost:2181(CONNECTED) 28] delete /test

如果该节点有子节点,则报错非空节点

[zk: localhost:2181(CONNECTED) 86] delete /test
Node not empty: /test
rmr命令

删除节点,有子节点一并删除

[zk: localhost:2181(CONNECTED) 90] rmr /test

(10). quit命令
退出客户端。

Java / Scala API使用案例

分别使用Java,Scala滴啊用zookeeper API创建ZK单例,实现读取和修改ZK节点数据。
Java

import org.apache.zookeeper.*;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.Stat;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class ZKUtils implements Watcher {
    private static ZKUtils instance;
    private static ZooKeeper zk;

    private ZKUtils(Properties prop) {
        try {
            zk = new ZooKeeper(prop.getProperty("ZK_HOST"), 20000, this);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static ZKUtils getInstance(Properties prop) {
        if (instance == null) {
            synchronized (ZKUtils.class) {
                if (instance == null) {
                    try {
                        instance = new ZKUtils(prop);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        return instance;
    }

    public void process(WatchedEvent watchedEvent) {

    }

    public String getData(String path) throws KeeperException, InterruptedException {
        PathUtils.validatePath(path);
        Stat stat = zk.exists(path, false);
        String res = null;
        if (stat != null) {
            res = new String(zk.getData(path, false, stat));
        }
        return res;
    }

    public void createPath(String path) throws KeeperException, InterruptedException {
        // 验证路径有效性
        PathUtils.validatePath(path);
        List<String> paths = Arrays.asList(path.split("/"));
        for (int i = 2; i <= paths.size(); i ++) {
            String tmpPath = "/" + String.join("/", paths.subList(1, i));
            // 如果路径为空
            if (zk.exists(tmpPath, false) == null) {
                // 一层一层创建, value为空串.acl权限,节点类型
                zk.create(tmpPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        }
    }

    public void setData(String path, String data) throws KeeperException, InterruptedException {
        PathUtils.validatePath(path);
        if (zk.exists(path, false) == null) {
            this.createPath(path);
        }
        // -1是最新版本
        zk.setData(path, data.getBytes(), -1);
    }

    public static void main(String[] args) throws KeeperException, InterruptedException {
        Properties prop = new Properties();
        prop.setProperty("ZK_HOST", "cloudera01,cloudera02,cloudera03");
        // 读取zk数据
        System.out.println(ZKUtils.getInstance(prop).getData("/test"));
        // 创建路径,节点存储值空串
        ZKUtils.getInstance(prop).createPath("/test/gp/b/c");
        // 修改节点数据
        ZKUtils.getInstance(prop).setData("/test/gp/b/c", "success");
    }
}

scala

import org.apache.zookeeper._
import org.apache.zookeeper.common._
import java.util.Properties

import org.apache.zookeeper.data.Stat

class ZkUtilities() extends Watcher {
  private var zk: ZooKeeper = _
  private var pro: Properties = _
  private def this(pro: Properties) = {
    this()
    this.pro = pro
    zk = new ZooKeeper(pro.getProperty("ZK_HOST"), 20000, this)
  }

  @Override
  def process(event: WatchedEvent) {
    //no watching
  }

  private def createPath(path: String, zk: ZooKeeper): Unit = {
    //创建路径
    PathUtils.validatePath(path)
    var parentPath = path.substring(0, path.indexOf("/", 0) + 1)
    while (parentPath != null && !parentPath.isEmpty) {
      if (zk.exists(parentPath, false) == null) {
        // 同步创建,acl权限完全开放,节点类型持久化节点
        zk.create(parentPath, "".getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
      }

      if (parentPath.equals(path)) {
        parentPath = null
      } else {
        var idx = path.indexOf("/", parentPath.length() + 1)
        if (-1 == idx) {
          idx = path.length
        }
        parentPath = path.substring(0, idx)
      }
    }
  }

  def setData(path: String, data: String): Stat = {
    if (zk.exists(path, false) == null) {
      createPath(path, zk)
    }
    zk.setData(path, data.getBytes, -1)
  }

  def getData(path: String): String = {
    val stat = zk.exists(path, false)
    if (stat == null) {
      null
    } else {
      new String(zk.getData(path, false, stat))
    }
  }
}

object ZkUtilities {
  private var zkUtilities: ZkUtilities = _
  def getIns(prop: Properties): ZkUtilities = {
    this.synchronized {
      if (zkUtilities == null) {
        zkUtilities = new ZkUtilities(prop)
      }
      zkUtilities
    }
  }

  def main(args: Array[String]): Unit = {
    val prop = new Properties()
    prop.setProperty("ZK_HOST", "cloudera01,cloudera02,cloudera03")
    var res = ZkUtilities.getIns(prop).getData("/test/a/value")
    println(res)  // false
    ZkUtilities.getIns(prop).setData("/test/a/value", "true")
    res = ZkUtilities.getIns(prop).getData("/test/a/value")
    println(res)  // true
  }
}
上一篇 下一篇

猜你喜欢

热点阅读