大数据

大数据详细教程系列(6)——ZooKeeper

2019-05-08  本文已影响0人  EVAO_8dac

1. ZooKeeper

开源的分布式的协调服务,是Google的Chubby一个开源的实现,它是一个为分布式应用提供一致性服务的软件

2. ZooKeeper提供的功能

3. ZooKeeper的特点

4. ZooKeeper的角色

5. ZooKeeper数据模型

6. ZooKeeper的节点

7. ZooKeeper中的选举机制

8. ZooKeeper集群搭建

安装zookeeper集群要求大于1的奇数台机器

8.1 准备安装包

zookeeper-3.4.6.tar.gz

8.2 解压

tar -zxvf zookeeper-3.4.6.tar.gz -C /opt/

8.3 重命名

mv zookeeper-3.4.6/ zookeeper

8.4 配置环境变量

export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=PATH:ZOOKEEPER_HOME/bin

8.5 配置zookeeper

cp zoo_sample.cfg zoo.cfg

dataDir=/opt/zookeeper/data 

8.6 myid文件

在zookeeper的各个机器中分别创建myid文件(/opt/zookeeper/data ),内容分别为

1 2 3

8.7 配置zoo.cfg

server.1=uplooking03:2888:3888
server.2=uplooking04:2888:3888
server.3=uplooking05:2888:3888

8.8 查看zookeeper集群的时间

保证zookeeper 集群中的时间不能有超过20秒的误差

ntpdate -u ntp.api.bz 根据时间同步服务器同步时间,-u是绕过防火请

8.9 启动zookeeper服务

zkServer.sh start

9. ZooKeeper

9.1 zookeeper的Shell操作

9.2 zookeeper原生的API操作

package com.uplooking.bigdata.zookeeper;

import org.apache.zookeeper.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class ZookeeperTest {
    private ZooKeeper zooKeeper;

    @Before
    public void init() throws Exception {
        String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
        zooKeeper = new ZooKeeper(connStr, 3000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("watch..." + watchedEvent.getType());
            }
        });
    }

    /**
     * 创建节点
     */
    @Test
    public void testCreateZNode() throws Exception {
        String path = "/test01";
        zooKeeper.exists(path, true);
        String ret = zooKeeper.create(path, "HELLO2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(ret);
    }

    @Test
    public void testSetZnode() throws KeeperException, InterruptedException {
        zooKeeper.setData("/test02", "uplooking02".getBytes(), 1);
    }


    @Test
    public void testGetZnode() throws KeeperException, InterruptedException {
        byte[] data = zooKeeper.getData("/test02", true, null);
        System.out.println(new String(data, 0, data.length));
    }

    @Test
    public void testDeleteZnode() throws KeeperException, InterruptedException {
        zooKeeper.delete("/test02", -1);
    }

    @After
    public void destory() throws Exception {
        zooKeeper.close();
    }
}

10 ZooKeeper客户端神器Curator

11 Curator中的组件

名称 描述
Recipes Zookeeper典型应用场景的实现,这些实现是基于Curator Framework。
Framework Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。
Utilities 为Zookeeper提供的各种实用程序。
Client Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。
Errors Curator如何处理错误,连接问题,可恢复的例外等。

12 依赖POM

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>

package com.uplooking.bigdata.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryForever;
import org.junit.Before;
import org.junit.Test;
public class CuratorTest {
    public String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
    private CuratorFramework zkClient;

    @Before
    public void init() {
        zkClient = CuratorFrameworkFactory.newClient(connStr, new RetryForever(6000));
        zkClient.start();
    }

    /**
     * 创建空节点(其实不是空节点,是给节点默认设置了ip地址)
     *
     * @throws Exception
     */
    @Test
    public void createZnode() throws Exception {
        zkClient.create().forPath("/test03");
    }

    @Test
    public void createZnode1() throws Exception {
        zkClient.create().forPath("/test02", "h".getBytes());
    }


    @Test
    public void deleteZnode() throws Exception {
        zkClient.delete().deletingChildrenIfNeeded().forPath("/test02");
    }

    @Test
    public void setZnode() throws Exception {
        zkClient.setData().forPath("/test03");
    }

    @Test
    public void getZnode() throws Exception {
        byte[] bytes = zkClient.getData().forPath("/test03");
        System.out.println(new String(bytes, 0, bytes.length));
    }
}

10 ZooKeeper客户端神器Curator

10.1 Curator简介

10.2 Curator中的组件

名称 描述
Recipes Zookeeper典型应用场景的实现,这些实现是基于Curator Framework。
Framework Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。
Utilities 为Zookeeper提供的各种实用程序。
Client Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。
Errors Curator如何处理错误,连接问题,可恢复的例外等。

10.3 Curator的基本API操作

/*Curator的基本的节点操作*/
package com.uplooking.bigdata.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryForever;
import org.junit.Before;
import org.junit.Test;
public class CuratorTest {
    public String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
    private CuratorFramework zkClient;

    @Before
    public void init() {
        zkClient = CuratorFrameworkFactory.newClient(connStr, new RetryForever(6000));
        zkClient.start();
    }

    /**
     * 创建空节点(其实不是空节点,是给节点默认设置了ip地址)
     *
     * @throws Exception
     */
    @Test
    public void createZnode() throws Exception {
        zkClient.create().forPath("/test03");
    }

    @Test
    public void createZnode1() throws Exception {
        zkClient.create().forPath("/test02", "h".getBytes());
    }


    @Test
    public void deleteZnode() throws Exception {
        zkClient.delete().deletingChildrenIfNeeded().forPath("/test02");
    }

    @Test
    public void setZnode() throws Exception {
        zkClient.setData().forPath("/test03");
    }

    @Test
    public void getZnode() throws Exception {
        byte[] bytes = zkClient.getData().forPath("/test03");
        System.out.println(new String(bytes, 0, bytes.length));
    }
}

10.4 Curator的监听器

10.4.1 NodeCache监听器

String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
RetryPolicy retryPolicy = new RetryNTimes(3, 3000);
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(connStr, retryPolicy);
zkClient.start();
String path = "/test01";
//创建监听监听器
NodeCache nodeCache = new NodeCache(zkClient, path);
nodeCache.start();

nodeCache.getListenable().addListener(new NodeCacheListener() {
    @Override
    public void nodeChanged() throws Exception {
        if (nodeCache.getCurrentData() == null) {
            System.out.println("删除了节点...."+path);
        } else {
            System.out.println("节点改变.." + "路径为:" + nodeCache.getCurrentData().getPath() + "数据为:" + new String(nodeCache.getCurrentData().getData()));
        }
    }
});
Thread.sleep(Integer.MAX_VALUE);
zkClient.close();

10.4.2 PathChildrenCache监听器

String connStr = "uplooking03:2181,uplooking04:2181,uplooking05:2181";
RetryPolicy retryPolicy = new RetryNTimes(3, 3000);
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(connStr, retryPolicy);
zkClient.start();
String path = "/test01";
//创建监听监听器
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, path, true);
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    @Override
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        System.out.println("一级子节点改变.."+event.getData()+ event.getType());
    }
});
Thread.sleep(Integer.MAX_VALUE);
zkClient.close();
上一篇 下一篇

猜你喜欢

热点阅读