使用Zookeeper的Watcher实现分布式锁

2021-04-06  本文已影响0人  Wannay
package com.wanna.zk.zkstudy;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.LockSupport;

/**
 * 利用zk实现分布式锁思路:使用临时顺序节点
 * 1.在执行加锁之前,先判断/LOCKS有没有被创建,如果没有则先创建
 * 2.如果创建了/LOCKS,则创建/LOCKS/LOCK_xxxxxxxx这样子的临时顺序节点
 * 3.尝试去加锁的逻辑,先获取/LOCKS/下所有的孩子节点,并进行排序
 * ---3.1如果当前锁在的位置是0,也就是第一个元素,那么就获取锁成功,开始执行业务代码
 * ---3.2如果当前锁所在的位置不是0,那么就获取它的前一个节点,并进行监控
 * ------如果前一个节点已经被删了,那么就继续尝试去获取锁
 * ------如果前一个节点还没被删,那么就阻塞当前线程,直到监听器(Watcher)对象将其唤醒才继续执行
 * 4.解锁逻辑,删除自己创建的LOCK这条记录,因为是临时顺序节点,因此就算机器宕机了也会自动删除锁
 */
public class ZKLock {
    private static final String LOCK_ROOT = "/LOCKS";   //锁的根路径
    private static final String LOCK_NODE_NAME = "LOCK_";  //锁的名称,使用临时顺序节点
    private String lockPath;  //完整的锁路径
    //集群的连接字符串,机器ip:port中间用逗号分隔即可
    String connectString = "localhost:2181,localhost:2182,localhost:2183";  
    private ZooKeeper zooKeeper;  //zk对象
    private final Thread currentThread = Thread.currentThread();  //获取创建对象的线程,用来进行唤醒和阻塞
    private int zkSessionTimeout = 5000;  //sessionTimeout

    //判断某个元素是否被删除,如果删除了就将当前线程唤醒
    Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            //如果获取到删除节点的事件,那么就唤醒当前线程
            if (event.getType() == Event.EventType.NodeDeleted) {
                LockSupport.unpark(currentThread);  //将当前线程唤醒
            }
        }
    };

    public ZKLock() {
        try {
            zooKeeper = new ZooKeeper(connectString, zkSessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.None) {
                        //如果链接建立成功,打印相关信息
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            System.out.println("ZK连接建立成功");
                            LockSupport.unpark(currentThread);
                        }
                    }
                }
            });
            LockSupport.park();   //因为是异步去进行连接,因此这里需要等待
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void lock() {
        try {
            createLock();  //createLock
            attemptLock();  //尝试去获取锁
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void createLock() throws KeeperException, InterruptedException {
        //判断LOCKS节点是否存在
        final Stat exists = zooKeeper.exists(LOCK_ROOT, false);
        //如果不存在则创建一个/LOCKS的持久节点
        if (exists == null) {
            zooKeeper.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        //创建临时有序节点,节点为/LOCKS/LOCK_xxxxxxxx
        lockPath = zooKeeper.create(LOCK_ROOT.concat("/").concat(LOCK_NODE_NAME), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("LOCK NODE ".concat(lockPath).concat(" has been created"));
    }

    //尝试去获取锁
    public void attemptLock() throws KeeperException, InterruptedException {
        //获取/LOCKS节点的孩子节点
        final List<String> children = zooKeeper.getChildren(LOCK_ROOT, false);
        //对children节点进行排序
        Collections.sort(children);
        //获取当前锁节点排序之后的下标,截取掉/LOCKS/之后的内容
        final int index = children.indexOf(lockPath.substring(LOCK_ROOT.length() + 1));
        //如果获取到的index为0,也就是第一个元素
        if (index == 0) {
            System.out.println("获取LOCK成功");
            return;
        }
        //如果不是第一个元素,获取上一个节点的路径
        final String s = children.get(index - 1);
        //监视它的上一个元素的变化情况,传入watcher对象
        final Stat exists = zooKeeper.exists(LOCK_ROOT.concat("/").concat(s), watcher);
        //继续去获取锁
        if (exists != null) {  //如果exists不为null,那么就阻塞,不然就尝试去获取锁
            LockSupport.park();
        }
        attemptLock();
    }

    public void unlock() {
        //删除锁的临时有序节点,并且关闭连接对象
        try {
            zooKeeper.delete(lockPath, -1);
            zooKeeper.close();
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }
}

上一篇下一篇

猜你喜欢

热点阅读