大数据 - (六-1)- Zookeeper

2020-08-27  本文已影响0人  啦啦啦喽啰

Zookeeper简介

Zookeeper是什么?

Zookeeper架构组成

image.png

Leader

Follower

Observer

Zookeeper特点

Zookeeper环境搭建

安装
tar -zxvf zookeeper-3.4.14.tar.gz -C ../servers/
创建文件目录
mkdir -p /opt/servers/zookeeper/data/logs
# cd /opt/servers/zookeeper/conf
# mv zoo_sample.cfg zoo.cfg
# vi  zoo.cfg
#更新datadir
dataDir=/opt/servers/zookeeper/data
#增加logdir
dataLogDir=/opt/servers/zookeeper/data/logs
#增加集群配置
##server.服务器ID=服务器IP地址:服务器之间通信端⼝:服务器之间投票选举端⼝
server.1=os1:2888:3888
server.2=os2:2888:3888
server.3=os3:2888:3888
#打开注释
#ZK提供了⾃动清理事务⽇志和快照⽂件的功能,这个参数指定了清理频率,单位是⼩时
autopurge.purgeInterval=1
cd /opt/servers/zookeeper/data
echo 1 > myid
# os2
echo 2 > myid
# os3
echo 3 > myid
# 三台节点分别执行
/opt/servers/zookeeper/bin/zkServer.sh start
vi zk.sh
#!/bin/sh
echo "start zookeeper server..."
if(($#==0));then
echo "no params";
exit;
fi
hosts="os1 os2 os3"
for host in $hosts
do
ssh $host "source /root/.bash_profile; /opt/servers/zookeeper/bin/zkServer.sh $1"
done

Zookeeper数据结构与监听机制

ZooKeeper数据模型Znode

ZNode 的类型
事务ID

ZNode 的状态信息

#使⽤zkCli.sh 连接到zk集群
[zk: localhost:2181(CONNECTED) 2] get /zookeeper

cZxid = 0x0
ctime = Wed Dec 31 19:00:00 EST 1969
mZxid = 0x0
mtime = Wed Dec 31 19:00:00 EST 1969
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

Watcher 机制

主要包括:
具体⼯作流程为

Zookeeper的基本使⽤

ZooKeeper命令⾏操作

./zkcli.sh 连接本地的zookeeper服务器
./zkCli.sh -server ip:port(2181) 连接指定的服务器

Zookeeper-开源工具

ZkClient

ZkClient是Github上⼀个开源的zookeeper客户端,在Zookeeper原⽣API接⼝之上进⾏了包装,是⼀个更易⽤的Zookeeper客户端,同时,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能

添加依赖
    <dependencies>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.14</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.2</version>
        </dependency>
    </dependencies>
创建会话
public class ZkDemo {
    public static void main(String[] args) {
        // 获取zk client 对象, 通讯端口2181
        // 建立会话
        ZkClient zkClient = new ZkClient("tctj1:2181");
        System.out.println("zkclient is ready, ZooKeeper session established.");

        // 创建节点, createParents的值设置为true,可以递归创建节点
        zkClient.createPersistent("/lg-zkClient/lg-c1",true);
        System.out.println("success create znode.");
    }
}
创建节点
public class Create_Node_Sample {
 public static void main(String[] args) {
 ZkClient zkClient = new ZkClient("127.0.0.1:2181");
 System.out.println("ZooKeeper session established.");
 //createParents的值设置为true,可以递归创建节点
 zkClient.createPersistent("/lg-zkClient/lg-c1",true);
 System.out.println("success create znode.");
 }
}
删除节点
public class Del_Data_Sample {
 public static void main(String[] args) throws Exception {
 String path = "/lg-zkClient/lg-c1";
 ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
 zkClient.deleteRecursive(path);
 System.out.println("success delete znode.");
 }
}
监听节点变化
public class Get_Child_Change {
    public static void main(String[] args) throws InterruptedException {
        // 获取zclient
        ZkClient zkClient = new ZkClient("tctj1:2181");
        String path = "/lg-client";
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            public void handleChildChange(String s, List<String> list) throws Exception {
                //打印节点信息
                System.out.println(s + " childs changes ,current childs " +
                        list);
            }
        });

        // 测试
        zkClient.createPersistent("/lg-client");
        Thread.sleep(1000); //只是为了⽅便观察结果数据
        zkClient.createPersistent("/lg-client/c1");
        Thread.sleep(1000);
        zkClient.delete("/lg-client/c1");
        Thread.sleep(1000);
        zkClient.delete("/lg-client");
        Thread.sleep(Integer.MAX_VALUE);
         /*
         1 监听器可以对不存在的⽬录进⾏监听
         2 监听⽬录下⼦节点发⽣改变,可以接收到通知,携带数据有⼦节点列表
         3 监听⽬录创建和删除本身也会被监听到
         */
    }
}
获取数据(节点是否存在、更新、删除)
public class Get_Data_Change {
    public static void main(String[] args) throws InterruptedException {
        // 获取zkClient对象
        ZkClient zkClient = new ZkClient("tctj1:2181");
        // 设置序列化
        zkClient.setZkSerializer(new ZkStrSerializer());

        //判断节点是否存在,不存在创建节点并赋值
        String path = "/lg-client1";
        final boolean exists = zkClient.exists(path);
        if (!exists) {
            zkClient.createEphemeral(path, "123");
        }

        // 监听变化
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            public void handleDataChange(String s, Object o) throws Exception {
                //定义接收通知之后的处理逻辑
                System.out.println(s + " data is changed ,new data " +
                        o);
            }

            public void handleDataDeleted(String s) throws Exception {
                System.out.println(s + " is deleted!!");
            }
        });

        // 测试
        final Object o = zkClient.readData(path);
        System.out.println(o);
        zkClient.writeData(path, "new data");
        Thread.sleep(1000);
        //删除节点
        zkClient.delete(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

public class ZkStrSerializer implements ZkSerializer {
    public byte[] serialize(Object o) throws ZkMarshallingError {
        return String.valueOf(o).getBytes();
    }

    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        return new String(bytes);
    }
}

Zookeeper内部原理

Leader选举

选举机制
首次启动

*(1)服务器1启动,此时只有它⼀台服务器启动了,它发出去的报⽂没有任何响应,所以它的选举状态⼀直是LOOKING状态。
*(2)服务器2启动,它与最开始启动的服务器1进⾏通信,互相交换⾃⼰的选举结果,由于两者都没有历史数据,所以id值较⼤的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个
例⼦中的半数以上是3),所以服务器1、2还是继续保持LOOKING状态。
*(3)服务器3启动,根据前⾯的理论分析,服务器3成为服务器1、2、3中的⽼⼤,⽽与上⾯不同的是,此时有三台服务器选举了它,所以它成为了这次选举的Leader
*(4)服务器4启动,根据前⾯的分析,理论上服务器4应该是服务器1、2、3、4中最⼤的,但是由于前⾯已经有半数以上的服务器选举了服务器3,所以它只能接收当follower
*(5)服务器5启动,同4⼀样称为follower

集群⾮⾸次启动

ZAB⼀致性协议

分布式数据⼀致性问题

ZAB

ZAB协议
ZK如何处理集群数据
⼴播消息
Leader 崩溃问题

ZooKeeper应用

服务器动态上下线监听

image.png
具体实现
服务器端
public class ServerMain {
    private ZkClient zkClient = null;

    //获取到zk对象
    private void connectZK(){
        zkClient = new ZkClient("tctj1:2181,tctj2:2181,tctj3:2181");
        if(!zkClient.exists("/servers")){
            zkClient.createPersistent("/servers");
        }
    }

    // 注册服务端信息到zk节点
    private void registerServerInfo(String ip,String port){
        //创建临时顺序节点
        final String path =
                zkClient.createEphemeralSequential("/servers/server", ip +":"+port);
        System.out.println("---->>> 服务器注册成功,ip="+ip+";port ="+port+";节点路径信息="+path);
    }

    public static void main(String[] args) {
        final ServerMain server = new ServerMain();
        server.connectZK();
        server.registerServerInfo(args[0],args[1] );
        //启动⼀个服务线程提供时间查询
        new TimeServer(Integer.parseInt(args[1])).start();
    }
}

public class TimeServer extends Thread {
    private int port=0;

    public TimeServer(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        //启动serversocket监听⼀个端⼝
        try {
            final ServerSocket serverSocket = new ServerSocket(port);
            while(true){
                final Socket socket = serverSocket.accept();
                final OutputStream out = socket.getOutputStream();
                out.write(new Date().toString().getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
客户端
public class Client {
    //获取zkclient
    ZkClient zkClient = null;

    //维护⼀个serversi 信息集合
    ArrayList<String> infos = new ArrayList<String>();

    private void connectZk() {
        // 创建zkclient
        zkClient = new ZkClient("tctj1:2181,tctj2:2181");

        //第⼀次获取服务器信息,所有的⼦节点
        final List<String> childs = zkClient.getChildren("/servers");
        for (String child : childs) {
            //存储着ip+port
            final Object o = zkClient.readData("/servers/" + child);
            infos.add(String.valueOf(o));
        }

        //对servers⽬录进⾏监听
        zkClient.subscribeChildChanges("/servers", new IZkChildListener() {
            public void handleChildChange(String s, List<String> children)
                    throws Exception {
                //接收到通知,说明节点发⽣了变化,client需要更新infos集合中的数据
                ArrayList<String> list = new ArrayList<String>();
                //遍历更新过后的所有节点信息
                for (String path : children) {
                    final Object o = zkClient.readData("/servers/" + path);
                    list.add(String.valueOf(o));
                }
                //最新数据覆盖⽼数据
                infos = list;
                System.out.println("--》接收到通知,最新服务器信息为:" + infos);
            }
        });
    }

    //发送时间查询的请求
    public void sendRequest() throws IOException {
        //⽬标服务器地址
        final Random random = new Random();
        final int i = random.nextInt(infos.size());
        final String ipPort = infos.get(i);
        final String[] arr = ipPort.split(":");
        //建⽴socket连接
        final Socket socket = new Socket(arr[0], Integer.parseInt(arr[1]));
        final OutputStream out = socket.getOutputStream();
        final InputStream in = socket.getInputStream();
        //发送数据
        out.write("query time".getBytes());
        out.flush();
        //接收返回结果
        final byte[] b = new byte[1024];
        in.read(b);//读取服务端返回数据
        System.out.println("client端接收到server:+" + ipPort + "+返回结果:" +
                new String(b));
        //释放资源
        in.close();
        out.close();
        socket.close();
    }

    public static void main(String[] args) throws InterruptedException {
        final Client client = new Client();
        client.connectZk(); //监听器逻辑
        while (true) {
            try {
                client.sendRequest(); //发送请求
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    client.sendRequest();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            //每隔⼏秒中发送⼀次请求到服务端
            Thread.sleep(2000);
        }
    }
}

分布式锁

Hadoop HA

HA 概述

单点存在的问题

HDFS-HA ⼯作机制

HDFS-HA⼯作要点

上一篇 下一篇

猜你喜欢

热点阅读