大数据进阶

Zookeeper的使用

2019-05-23  本文已影响59人  lizb

对于Zookeeper的介绍和安装教程这里不再赘述,网上很多资料都介绍得很专业,上一篇文章在搭建Hadoop集群的时候也简单介绍了下。本篇主要内容如下:
1、Zookeeper客户端的基本操作
2、Zookeeper中api的使用
3、Zookeeper的选举流程

一、Zookeeper客户端的基本操作

进入zookeeper的安装目录下,在bin文件夹下面有一个zkCli.sh ,这个是zookeeper提供给客户端使用的脚步,在服务端已经启动好的情况下,直接启动这个脚本sh zkCli.sh,即可进入交互模式。

1、get:获取节点数据

[zk: localhost:2181(CONNECTED) 11] get /demo
hello
cZxid = 0x2000000b4
ctime = Mon May 13 12:11:16 CST 2019
mZxid = 0x2000000b4
mtime = Mon May 13 12:11:16 CST 2019
pZxid = 0x2000000b4
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

各项说明:
cZxid :创建节点的id
ctime : 节点的创建时间
mZxid :修改节点的id
mtime :修改节点的时间
pZxid :子节点的id
cversion : 子节点的版本
dataVersion : 当前节点数据的版本
aclVersion :权限的版本
ephemeralOwner :判断是否是临时节点
dataLength : 数据的长度
numChildren :子节点的数量

2、create:创建节点
基本格式:create [-s] [-e] path data acl

创建持久化节点目录:

[zk: localhost:2181(CONNECTED) 9] create /demo hello 
Created /demo
[zk: localhost:2181(CONNECTED) 11] get /demo
hello
cZxid = 0x2000000b4
ctime = Mon May 13 12:11:16 CST 2019
mZxid = 0x2000000b4
mtime = Mon May 13 12:11:16 CST 2019
pZxid = 0x2000000b4
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

创建临时节点目录:create -e

[zk: localhost:2181(CONNECTED) 19] create -e /tempnode hello
Created /tempnode
[zk: localhost:2181(CONNECTED) 20] get /tempnode
hello
cZxid = 0x2000000b7
ctime = Mon May 13 12:26:18 CST 2019
mZxid = 0x2000000b7
mtime = Mon May 13 12:26:18 CST 2019
pZxid = 0x2000000b7
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0xc969fb77db58000a
dataLength = 5
numChildren = 0

创建PERSISTENT_SEQUENTIAL类型的节点: create -s

[zk: localhost:2181(CONNECTED) 22] create -s /demo/sec seq
Created /demo/sec0000000000
[zk: localhost:2181(CONNECTED) 23] create -s /demo/sec seq
Created /demo/sec0000000001
[zk: localhost:2181(CONNECTED) 24] create -s /demo/sec seq
Created /demo/sec0000000002
[zk: localhost:2181(CONNECTED) 25] ls /demo
[sec0000000000, sec0000000001, sec0000000002]

ls命令显示查看当前节点情况。可以看到这里创建了顺序节点,顺序节点会自动累加。

3、修改节点:set path data [version]

[zk: localhost:2181(CONNECTED) 26] get /demo
hello
cZxid = 0x2000000b4
ctime = Mon May 13 12:11:16 CST 2019
mZxid = 0x2000000b4
mtime = Mon May 13 12:11:16 CST 2019
pZxid = 0x2000000ba
cversion = 3
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 3
[zk: localhost:2181(CONNECTED) 27] set /demo hello-1
cZxid = 0x2000000b4
ctime = Mon May 13 12:11:16 CST 2019
mZxid = 0x2000000bb
mtime = Mon May 13 12:36:04 CST 2019
pZxid = 0x2000000ba
cversion = 3
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 3

修改后,节点内容变为了新的内容,并且数据版本dataVersion由0变为了1,我们再来修改指定版本的数据内容:

[zk: localhost:2181(CONNECTED) 28] set /demo hello-2 0
version No is not valid : /demo

看到这里我们对0版本的数据进行更新会出错,表示版本不可用了,这是因为上一步我们已经改了一次,数据版本变为了1,我们把命令中的0换成1试试:

[zk: localhost:2181(CONNECTED) 29] set /demo new-hello2 1
cZxid = 0x2000000b4
ctime = Mon May 13 12:11:16 CST 2019
mZxid = 0x2000000bd
mtime = Mon May 13 12:41:33 CST 2019
pZxid = 0x2000000ba
cversion = 3
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 10
numChildren = 3

可以看到这样就可以了,这就是指定版本的数据修改。

4、删除节点:delete path [version]

[zk: localhost:2181(CONNECTED) 34] ls /demo     
[sec0000000000, sec0000000001, sec0000000002]
[zk: localhost:2181(CONNECTED) 35] delete /demo/sec0000000000
[zk: localhost:2181(CONNECTED) 36] delete /demo/sec0000000001
[zk: localhost:2181(CONNECTED) 37] delete /demo/sec0000000002
[zk: localhost:2181(CONNECTED) 38] ls /demo
[]

二、Zookeeper中api的使用

1、java中对节点的增、删、改、查

package com.lzb.demo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;

public class MyZKClient {

    public static void main(String[] args) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        try {

            String servers = "192.168.218.106,192.168.218.107,192.168.218.108,192.168.218.109";

            ZooKeeper zk = new ZooKeeper(servers, 3000, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
                        countDownLatch.countDown();
                    }
                }
            });

            System.out.println("await...");
            countDownLatch.await();

            System.out.println("zk.state==="+zk.getState());

            String node = "/app";
            Stat stat = zk.exists(node, false);
            if(stat == null){
                System.out.println("创建节点...");
                String createResult = zk.create(node, "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                System.out.println("createResult==="+createResult);
                stat = zk.exists(node, false);
            }

            byte[] data = zk.getData(node, false, stat);
            System.out.println("当前(data,version)===(" + new String(data) + "," + stat.getVersion() + ")");

            stat = zk.setData(node, "hello-update".getBytes(), stat.getVersion());
            byte[] data2 = zk.getData(node, false, stat);
            System.out.println("更新后(data,version)===(" + new String(data2) + "," + stat.getVersion() + ")");


            zk.delete(node,stat.getVersion());
            stat = zk.exists(node, false);
            if(stat == null){
                System.out.println("delete complete!");
            }

            zk.close();


        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

运行结果:

await...
Watch===None
zk.state===CONNECTED
创建节点...
createResult===/app
当前(data,version)===(hello,0)
更新后(data,version)===(hello-update,1)
delete complete!

Process finished with exit code 0

上面的代码只是一个简单的增删改查,当我们想要监控某个节点的情况的时候,得用到zookeeper里面的一个叫Watcher的类,它相当于为某个节点的某个事件注册一个监听器,当事件触发的时候,就会回调这个Watcher。

2、使用Watcher监听节点的情况

Watcher 设置是开发中最常见的,需要搞清楚watcher的一些基本特征,对于exists、getdata、getchild对于节点的不同操作会收到不同的 watcher信息:

state=-112 会话超时状态
state= -113 认证失败状态
state= 1 连接建立中
state= 2 (暂时不清楚如何理解这个状态,ZOO_ASSOCIATING_STATE)
state=3 连接已建立状态
state= 999 无连接状态

type=1 创建节点事件
type=2 删除节点事件
type=3 更改节点事件
type=4 子节点列表变化事件
type= -1 会话session事件
type=-2 监控被移除事件

对父节点的变更以及孙节点的变更都不会触发watcher,而对watcher本身节点以及子节点的数量变更会触发watcher,具体参照下表:

下面我们通过来演示watcher对节点的监控,代码如下:

package com.lzb.demo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.List;
import java.util.concurrent.CountDownLatch;

public class NodeMonitor {

    private ZooKeeper zk;
    private String servers = "192.168.218.106,192.168.218.107,192.168.218.108,192.168.218.109";
    private int sessionTimeout = 30*60*1000;
    private String node;


    /**
     * 开始监控
     * @param node 节点路径
     */
    public void startMonitor(String node){
        this.node = node;
        connectZookeeper();
        monitorExist();
        monitorGetData();
        monitorGetChildren();

        //目的是让程序不终止
        while (true){
            try {
                Thread.sleep(sessionTimeout);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * 连接zookeeper
     */
    private void connectZookeeper(){
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);

            zk = new ZooKeeper(servers, sessionTimeout, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    System.out.println(watchedEvent.getState());
                    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                        countDownLatch.countDown();

                    }
                }
            });

            countDownLatch.await();

        }catch (Exception e){
            e.printStackTrace();
        }

    }

    /**
     * 监听节点触发exist的事件
     */
    private void monitorExist() {
        System.out.println("触发事件==>回调exist");
        try {

            Stat stat = zk.exists(node, new Watcher() {
                public void process(WatchedEvent event) {
                    //注意因为Watcher的使用是一次性的,也就是说使用后它就会被移除,所以这里我们需要再次挂上监听器
                    monitorExist();
                }
            });

            if(stat == null){
                System.out.println(node + " 节点不存在");
            }else {
                System.out.println(node + " 节点存在");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 监听节点触发getData的事件
     */
    private void monitorGetData() {
        System.out.println("触发事件==>回调getData");

        try {
            Stat stat = zk.exists(node, false);
            if(stat == null) {
                System.out.println(node + " 节点不存在");
                return;
            }

            byte[] bytes = zk.getData(node, new Watcher() {
                public void process(WatchedEvent event) {
                    //注意因为Watcher的使用是一次性的,也就是说使用后它就会被移除,所以这里我们需要再次挂上监听器
                    monitorGetData();
                }
            }, null);

            String data = new String(bytes);

            System.out.println(node + "节点的数据 === " + data);

        } catch(Exception e) {
            e.printStackTrace();
        }
    }

    /***
     * 监听节点触发getChildren的事件
     */
    private void monitorGetChildren() {
        System.out.println("触发事件==>回调getChildren");

        try {
            Stat stat = zk.exists(node, false);
            if(stat == null){
                System.out.println(node + " 节点不存在");
                return;
            }

            List<String> children = zk.getChildren(node, new Watcher() {
                public void process(WatchedEvent event) {
                    //注意因为Watcher的使用是一次性的,也就是说使用后它就会被移除,所以这里我们需要再次挂上监听器
                    monitorGetChildren();
                }
            });

            System.out.print(node + "节点的子节点 === [");
            for (String name : children ) {
                System.out.print(name + " ");
            }
            System.out.print("]\n");

        }catch(Exception e) {
            e.printStackTrace();
        }
    }

}

为了实现能自动挂上监听器,这里我们先通过zkCli.sh客户端在zookeeper上创建一个节点"/app":

[zk: localhost:2181(CONNECTED) 99] create /app hello-app
Created /app

然后运行主类代码:

package com.lzb.demo;

public class Demo {
    public static void main(String[] args) {
        NodeMonitor monitor = new NodeMonitor();
        monitor.startMonitor("/app");
    }
}

打印信息如下:


获取节点信息

接着我们在客户端上更新"/app"的数据:

[zk: localhost:2181(CONNECTED) 104] set /app hello-app-new
cZxid = 0x200000170
ctime = Tue May 14 13:59:16 CST 2019
mZxid = 0x200000174
mtime = Tue May 14 14:01:21 CST 2019
pZxid = 0x200000170
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 13
numChildren = 0
[zk: localhost:2181(CONNECTED) 105] 

控制台这时打印信息如下:


更新节点数据

现在我们在"/app"下创建3个子节点:

[zk: localhost:2181(CONNECTED) 105] create /app/sub1 hello-sub1
Created /app/sub1
[zk: localhost:2181(CONNECTED) 106] create /app/sub2 hello-sub2
Created /app/sub2
[zk: localhost:2181(CONNECTED) 107] create /app/sub3 hello-sub3
Created /app/sub3
[zk: localhost:2181(CONNECTED) 108] ls /app
[sub2, sub3, sub1]

这时控制台打印信息如下:


创建子节点

然后我们随便修改一个子节点的数据或者创建一个孙节点:

[zk: localhost:2181(CONNECTED) 109] set /app/sub1 hello-sub1-new
cZxid = 0x200000175
ctime = Tue May 14 14:04:30 CST 2019
mZxid = 0x200000178
mtime = Tue May 14 14:07:41 CST 2019
pZxid = 0x200000175
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 14
numChildren = 0
[zk: localhost:2181(CONNECTED) 113] create /app/sub1/ssub1 hello-ssub1
Created /app/sub1/ssub1

但是发现控制台依旧还是上面的那样,并没有打印子节点的数据。这是因为我们在代码中是对"/app"这个节点进行监控的,对其子节点的数据修改触发不了本节点的事件,所以才会出现这种情况,如果想获取其子节点的数据信息可以单独监控这个子节点来即可。

最后我们依次删除子节点:

[zk: localhost:2181(CONNECTED) 133] delete /app/sub1/ssub1
[zk: localhost:2181(CONNECTED) 134] delete /app/sub1
[zk: localhost:2181(CONNECTED) 135] delete /app/sub2
[zk: localhost:2181(CONNECTED) 136] delete /app/sub3
[zk: localhost:2181(CONNECTED) 139] delete /app

控制台打印信息如下:


删除子节点和本节点

OK,这就是zookeeper的Watcher监听器的使用。

三、Zookeeper的选举流程

Zookeeper的选举算法有三种:LeaderElection、AuthFastLeaderElection、FastLeaderElection。
其中FastLeaderElection是默认的算法,前两种是过时了的。这里只简要罗列一下FastLeaderElection的选举流程(详细介绍请参见http://www.cnblogs.com/lpshou/archive/2013/06/14/3136738.html):

假设有五台服务器组成的zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的.假设这些服务器依序启动,来看看会发生什么:

1. 服务器1启动,此时只有它一台服务器启动了,它发出去的报没有任何响应,所以它的选举状态一直是LOOKING状态

2. 服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1,2还是继续保持
LOOKING状态.

3. 服务器3启动,根据前面的理论分析,服务器3成为服务器1,2,3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的leader.

4. 服务器4启动,根据前面的分析,理论上服务器4应该是服务器1,2,3,4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了.

5. 服务器5启动,同4一样,当小弟.

上一篇下一篇

猜你喜欢

热点阅读