zookeeper-zkclinet,curator,简单锁

2019-03-23  本文已影响0人  xc2023

zkclient

1),可以递归创建
2),可以递归删除

创建一个单机的zookeeper

systemctl stop firewalld#关闭防火墙
docker pull zookeeper:3.4 #拉取镜像
docker run -d --name=zookeeper -p 2181:2181 zookeeper:3.4 #创建容器
docker exec -it zookeeper /bin/bash#进入容器
zkCli.sh #进入zookeeper客户端

依赖

<dependency>
            <groupId>com.github.sgroschupf</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.11</version>
        </dependency>

zkclientApi

package com.demo.service;

import com.demo.config.Person;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.util.List;

public class ZkService {

    public static ZkClient zkClient =
            new ZkClient("192.168.116.150:2181", 5000, 5000, new SerializableSerializer());

    /**
     *  创建节点
     * path 节点路径
     * data 数据 ,可谓null
     * mode 4种类型 Ephemeral 临时的会话 persistent 永久的会话
     * acl acl策略
     * callback 注册一个异步回调函数
     * context 传递一个对象
     * createParents 指定是否创建父节点
     */
    public static void create() {
        zkClient.create("/test1","zkclient_test", CreateMode.EPHEMERAL);
        zkClient.createEphemeral("/test2");
        zkClient.createPersistent("/person",new Person("xiaoming","123456"));
        zkClient.createPersistent("/hello/c1",true);
    }

    /**
     * 删除
     */
    public static void delete() {
        zkClient.delete("/person/xiaoming"); //删除节点
        //zkClient.deleteRecursive("/testzoo1");//删除节点和子节点
    }

    /**
     * 修改
     */
    public static void update() {
        //zkClient.writeData("/testzoo3","hello");//写数据,覆盖原来的值
    }

    /**
     * 是否存在
     */
    public static void exists() {
        zkClient.exists("/Person");
    }

    /**
     * 读取节点的值
     * 对象要实现序列化接口
     */
    public static void select() {
        Stat stat = new Stat(); //节点的信息
        Person person = zkClient.readData("/person", stat);
        System.out.println(stat);
        System.out.println(person);
    }

    /**
     * 注册监听会开启一个新的线程来处理,无需自己在开一条线程单独注册
     *  监听接口                    注册监听方法                                  解除监听
     * IZkChildListener监听子节点     ZkClient的subscribeChildChanges方法        ZkClient的unsubscribeChildChanges方法
     * IZkDataListener 监听数据的变化     ZkClient的subscribeDataChanges方法         ZkClient的subscribeDataChanges方法
     * IZkStateListener监听服务状态的状态     ZkClient的subscribeStateChanges方法        ZkClient的unsubscribeStateChanges方法
     */
    public static void listen() {
        zkClient.subscribeChildChanges("/person", new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("parentpath" + parentPath + "--" + currentChilds);
            }
        });
    }
    /**
     * 监听节点数据的变化
     * */
    public static void listen(String string) {
        zkClient.subscribeDataChanges("/person", new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                //变化触发
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                //删除时触发
            }
        });
    }
    /**
     * 添加数据
     */
    public static void main(String[] args) throws InterruptedException {
        ZkService.select();

        Thread.sleep(60 * 1000);
    }
}
zoo1.png
zoo2.png

zkclient分布式锁
有序节点:假如当前有一个父节点为/lock,我们可以在这个父节点下面创建子节点;zookeeper提供了一个可选的有序特性,例如我们可以创建子节点“/lock/node-”并且指明有序,那么zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号,
临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点。
事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,zookeeper会通知客户端。当前zookeeper有如下四种事件:1)节点创建;2)节点删除;3)节点数据修改;4)子节点变更。

流程

package com.demo.lock;

public interface BaseLock {

    /**
     * 获取锁
     * */
    boolean getlock();

    /**
     * 释放锁
     * */
    void unlock();
}
public class BaseLockImpl implements BaseLock {

    private static final String ZOOKEEPER_IP_PORT = "192.168.116.150:2181";
    private static final String LOCK_PATH = "/LOCK";
    private CountDownLatch countDownLatch;
    private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());
    private String beforePath;
    private String currentPath;

    // 判断有没有LOCK目录,没有则创建
    public BaseLockImpl() {
        if (!this.client.exists(LOCK_PATH)) {
            this.client.createPersistent(LOCK_PATH);
        }
    }

    @Override
    public boolean getlock() {
        if (tryLock()) {
            System.out.println("=======================获取锁");
            return true;
        } else {
            waitForLock();
            return getlock();
        }
    }

    @Override
    public void unlock() {
        // 删除当前临时节点
        client.delete(currentPath);
        System.out.println("======删除节点=================");
    }

    /**
     * 创建节点
     */
    public Boolean tryLock() {
        // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
        if (currentPath == null || currentPath.length() <= 0) {
            // 创建一个临时顺序节点
            currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
            // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
            List<String> childrens = this.client.getChildren(LOCK_PATH);
            //由小到大排序所有子节点
            Collections.sort(childrens);
            //判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁
            if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
                return true;
            }
            //监听前一个节点
            else {
                int wz = Collections.binarySearch(childrens, currentPath.substring(6));
                beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
                return false;
            }
        }
        return true;
    }

    //等待锁,对次小节点进行监听
    private void waitForLock() {
        IZkDataListener listener = new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }

            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };

        // 对次小节点进行监听,排在前面的的节点增加数据删除的watcher
        this.client.subscribeDataChanges(beforePath, listener);
        if (this.client.exists(beforePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.client.unsubscribeDataChanges(beforePath, listener);
    }

业务类

public class Order {

    // 自增长序列
    private static int i = 0;

    // 按照规则生成订单编号
    public synchronized String getOrderCode() {
        Date now = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
        return sdf.format(now) + ++i;
    }
}

测试10个并发

public class LocalService implements Runnable{

    private static Order order = new Order();
    // 同时并发的线程数
    private static final int num = 10;
    // 按照线程数初始化倒计数器,倒计数器
    //保证线程同时执行
    private static CountDownLatch cdl = new CountDownLatch(num);
    private BaseLock baseLock = new BaseLockImpl();

    public void createOrder(){
        String orderCode = null;
        baseLock.getlock();
        try {
            orderCode = order.getOrderCode();
            System.out.println(orderCode);
        }catch (Exception e){
            //todo
        }finally {
            baseLock.unlock();
        }
    }
    @Override
    public void run() {
        try {
            cdl.await();
        }catch (Exception e){
            e.printStackTrace();
        }
        //创建订单
        createOrder();
    }
    public static void main(String[] args) {
        for (int i = 1; i <= num; i++) {
            // 按照线程数迭代实例化线程
            new Thread(new LocalService()).start();
            // 创建一个线程,倒计数器减1
            cdl.countDown();
        }
    }
}
zoo3.png
zoo4.png

curator

curator是连接ZK应用最广泛的工具
zk分布式锁,Master选举等等,curator包含了这些场景。
依赖

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

curator对节点的增删改查,事件监听

public class CuratorService {


    public static CuratorFramework client = null;
    // 集群模式则是多个ip:port,ip:port
    public static final String zkServerIps = "192.168.116.150:2181";
    //缓存节点,监听节点数据变动
    final static NodeCache nodeCache = new NodeCache(client,"/path");
    // 为子节点添加watcher
    // PathChildrenCache: 监听数据节点的增删改,可以设置触发的事件
    final static PathChildrenCache childrenCache = new PathChildrenCache(client, "/path", true);

    /**
     * basesleeptimems 初始化sleep的时间
     * maxretries 最大重试次数
     * maxsleeoms 最大重试时间
     */
    public CuratorService() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerIps)
                .sessionTimeoutMs(10000)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
        System.out.println("qi dong ke hu duan ...");

    }

    private void close() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        CuratorService curatorService = new CuratorService();
        // 创建节点
        String nodePath = "/super/testNode";  // 节点路径
        byte[] data = "this is a test data".getBytes();  // 节点数据
        String result = curatorService.client.create().creatingParentsIfNeeded()  // 创建父节点,也就是会递归创建
                .withMode(CreateMode.PERSISTENT)  // 节点类型
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)  // 节点的acl权限
                .forPath(nodePath, data);

        System.out.println(result + "节点,创建成功...");
        // 更新节点数据
        byte[] newData = "this is a new data".getBytes();
        Stat resultStat = curatorService.client.setData().withVersion(0)  // 指定数据版本
                .forPath(nodePath, newData);  // 需要修改的节点路径以及新数据
        // 删除节点
        curatorService.client.delete()
                .guaranteed()  // 如果删除失败,那么在后端还是会继续删除,直到成功
                .deletingChildrenIfNeeded()  // 子节点也一并删除,也就是会递归删除
                .withVersion(resultStat.getVersion())
                .forPath(nodePath);
        Thread.sleep(1000);
        // 读取节点数据
        Stat stat = new Stat();
        byte[] nodeData = curatorService.client.getData().storingStatIn(stat).forPath(nodePath);
        System.out.println("节点 " + nodePath + " 的数据为:" + new String(nodeData));
        System.out.println("该节点的数据版本号为:" + stat.getVersion());
        // 获取子节点列表
        List<String> childNodes = curatorService.client.getChildren().forPath(nodePath);
        System.out.println(nodePath + " 节点下的子节点列表:");
        // 查询某个节点是否存在,存在就会返回该节点的状态信息,如果不存在的话则返回空
        Stat statExist = curatorService.client.checkExists().forPath(nodePath);
        if (statExist == null) {
            System.out.println(nodePath + " 节点不存在");
        } else {
            System.out.println(nodePath + " 节点存在");
        }
        for (String childNode : childNodes) {
            System.out.println(childNode);
        }
        //缓存节点的数据
        curatorService.nodeCache.start(true);
        curatorService.nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                // 防止节点被删除时发生错误
                if (curatorService.nodeCache.getCurrentData() == null) {
                    System.out.println("获取节点数据异常,无法获取当前缓存的节点数据,可能该节点已被删除");
                    return;
                }
                // 获取节点最新的数据
                String data = new String(curatorService.nodeCache.getCurrentData().getData());
                System.out.println(curatorService.nodeCache.getCurrentData().getPath() + " 节点的数据发生变化,最新的数据为:" + data);
            }
        });
        /**
         * 监听子节点初始化的方式
         * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
         * NORMAL:异步初始化
         * BUILD_INITIAL_CACHE:同步初始化
         * */
        curatorService.childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        // 列出子节点数据列表,需要使用BUILD_INITIAL_CACHE同步初始化模式才能获得,异步是获取不到的
        List<ChildData> childDataList = childrenCache.getCurrentData();
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                // 通过判断event type的方式来实现不同事件的触发
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子节点初始化时触发
                    System.out.println("\n--------------\n");
                    System.out.println("子节点初始化成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 添加子节点时触发
                    //if (event.getData().getPath().trim().equals(NODE_PATH)) {}
                    System.out.println("\n--------------\n");
                    System.out.print("子节点:" + event.getData().getPath() + " 添加成功,");
                    System.out.println("该子节点的数据为:" + new String(event.getData().getData()));
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 删除子节点时触发
                    System.out.println("\n--------------\n");
                    System.out.println("子节点:" + event.getData().getPath() + " 删除成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子节点数据时触发
                    System.out.println("\n--------------\n");
                    System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功,");
                    System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData()));
                }
            }
        });
        // 关闭客户端
        curatorService.close();
    }

}

分布式锁

上一篇下一篇

猜你喜欢

热点阅读