spring

zookeeper实现分布式锁

2020-05-22  本文已影响0人  sunpy

1. 什么时候使用分布式锁

当多个系统分布在不同机器上时,系统之间需要以同步方式访问共享资源,而为了防止出现脏读,脏写,以及产生重复数据的情况等,使用互斥手段来保证数据的一致性,而使用分布式锁。

2. 实现分布式排他锁

思路:
(1)在zookeeper上创建指定/zk-spy持久化根节点
(2)在根节点下面创建临时顺序节点/zk-spy/lock_xxxxxx,从根节点获取下面所有的子节点,判断临时顺序节点是否为子节点中最小节点。是那么就获取了锁,不是就为前一个节点添加删除监听事件。并且让其当前线程进行等待。
(3)删除监听事件生效,那么解除等待,返回第二步重新进行操作,直到获取锁为止。
实现:
接口定义:

public interface IZkLock {

    // 建立连接
    public void connect(); 
    // 创建持久根节点
    public String createRootNode() throws UnsupportedEncodingException, KeeperException, InterruptedException;
    // 尝试获取锁
    public boolean tryAcquire();
    // 获取锁
    public void acquire() throws Exception;
    // 阻塞等待
    public void acquireWait() throws Exception;
    // 删除锁
    public void release() throws Exception;
}

实现类:

public class ZkLock implements IZkLock{

    // 控制线程执行顺序
    private CountDownLatch cdl = new CountDownLatch(1);;
    // zookeeper连接地址 "XX.XX.XXX.XXX:2181"
    private String zkAddr;
    // 重连时间
    private int timeout;
    // 持久化根节点路径 "/zk-spy" 
    private String rootPath;
    // 当前节点路径
    private String curPath;
    // 前驱节点路径
    private String prevPath;
    
    private ZooKeeper zookeeper;
    
    public ZkLock(String zkAddr, int timeout, String rootPath) {
        this.zkAddr = zkAddr;
        this.timeout = timeout;
        this.rootPath = rootPath;
    }
    
    /**
     * 建立连接zookeeper
     */
    @Override
    public void connect() {
        try {
            zookeeper = new ZooKeeper(zkAddr, timeout, new Watcher() {

                @Override
                public void process(WatchedEvent event) {
                    
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 创建持久根节点
     */
    @Override
    public String createRootNode() throws UnsupportedEncodingException, KeeperException, InterruptedException {
        Stat stat = zookeeper.exists(rootPath, false);
        
        if (stat != null) {
            return rootPath;
        } else {
            return zookeeper.create(rootPath, 
                    rootPath.getBytes("UTF-8"), 
                    Ids.OPEN_ACL_UNSAFE, 
                    CreateMode.PERSISTENT);
        }
    }
    
    /**
     * 尝试获取锁
     */
    @Override
    public boolean tryAcquire() {
        try {
            if (StringUtils.isBlank(curPath)) {
                String lockPath = rootPath + "/lock_";
                curPath = zookeeper.create(lockPath, 
                        lockPath.getBytes("UTF-8"), 
                        Ids.OPEN_ACL_UNSAFE, 
                        CreateMode.EPHEMERAL_SEQUENTIAL);
            }
            
            List<String> list = zookeeper.getChildren(rootPath, false);
            Collections.sort(list);
            
            // 当前节点就是最小节点
            if (curPath.equals(rootPath + "/" + list.get(0))) {
                return true;
            } else { // 如果当前节点不是最小节点,获取其前一个节点
                String partLockPath = curPath.substring(
                        curPath.lastIndexOf("/")+1, 
                        curPath.length());
                String prevNode = list.get(list.indexOf(partLockPath)-1);
                prevPath = rootPath + "/" + prevNode;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } 
        
        return false;
    }
    
    /**
     * 获取锁
     */
    @Override
    public void acquire() throws Exception {
        if (tryAcquire()) {
            System.out.println(Thread.currentThread().getName() + " ---> 获取锁成功了 curPath : " + curPath);
        } else {
            System.out.println(Thread.currentThread().getName() + " ---> 获取锁失败了,进入等待" );
            acquireWait();
            acquire();
        }
    }

    /**
     * 阻塞等待
     */
    @Override
    public void acquireWait() throws Exception {
        Stat stat = zookeeper.exists(prevPath, new Watcher() {

            @Override
            public void process(WatchedEvent event) {
                if (EventType.NodeDeleted == event.getType()) {
                    cdl.countDown();
                }
            }
        });
        
        if (stat != null) {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    /**
     * 删除锁
     * @throws Exception
     */
    @Override
    public void release() throws Exception {
        zookeeper.delete(curPath, -1);
        zookeeper.close();
    }
}

测试:

public static void main(String[] args) throws InterruptedException {
        
        for (int i = 0; i < 20; i++) {
            Thread t = new Thread(new Runnable() {

                @Override
                public void run() {
                    ZkLock zel = new ZkLock("XX.XX.XXX.XXX:2181", 4000, "/zk-spy");
                    zel.connect();
                    try {
                        zel.createRootNode();
                        zel.acquire();
                        Thread.sleep(500);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            zel.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            
            t.start();
        }
    }
Thread-9 ---> 获取锁失败了,进入等待
Thread-10 ---> 获取锁失败了,进入等待
Thread-13 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000302
Thread-11 ---> 获取锁失败了,进入等待
Thread-6 ---> 获取锁失败了,进入等待
Thread-16 ---> 获取锁失败了,进入等待
Thread-19 ---> 获取锁失败了,进入等待
Thread-8 ---> 获取锁失败了,进入等待
Thread-0 ---> 获取锁失败了,进入等待
Thread-1 ---> 获取锁失败了,进入等待
Thread-17 ---> 获取锁失败了,进入等待
Thread-14 ---> 获取锁失败了,进入等待
Thread-18 ---> 获取锁失败了,进入等待
Thread-3 ---> 获取锁失败了,进入等待
Thread-7 ---> 获取锁失败了,进入等待
Thread-4 ---> 获取锁失败了,进入等待
Thread-12 ---> 获取锁失败了,进入等待
Thread-15 ---> 获取锁失败了,进入等待
Thread-2 ---> 获取锁失败了,进入等待
Thread-5 ---> 获取锁失败了,进入等待
Thread-9 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000303
Thread-10 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000304
Thread-16 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000305
Thread-19 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000306
Thread-3 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000307
Thread-6 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000308
Thread-18 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000309
Thread-2 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000310
Thread-11 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000311
Thread-8 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000312
Thread-0 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000313
Thread-1 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000314
Thread-15 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000315
Thread-12 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000316
Thread-17 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000317
Thread-4 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000318
Thread-7 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000319
Thread-5 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000320
Thread-14 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000321

问题:

  1. 死循环的出现:
    发现第二个获取锁的线程,老是不停的获取锁失败,进入等待。
    程序没有判断当前的顺序临时节点创建的路径只有在空的情况下,才能创建,如果没有判断,总是创建新临时顺序节点来与根节点下面的最小孩子节点,总是不满足进入等待状态。加上我的递归调用,将出现死循环。

  2. 出现了空指针异常,在创建zookeeper的时候:

java.lang.NullPointerException: null
    at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:532)
    at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:507)

由于我们默认传入的Watcher为null,而在执行监听事件时,会遍历Watcher,却不判断是否为null,来执行事件。
ClientCnxn.EventThread.processEvent方法:

          for (Watcher watcher : pair.watchers) {
              try {
                          watcher.process(pair.event);
                  } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                  }
           }

所以在zookeeper建立连接的时候,需要创建一个空的Watcher。

  1. 多线程执行createRootNode()方法出现NodeExistsException异常。当第一个线程进入方法,发现该节点不存在,创建持久节点时,第二个线程进入方法也方法该节点不存在,创建持久节点,抛出该异常。
    解决:捕获这个异常,发现该节点存在,就直接返回即可。
    public void createRootNode() throws UnsupportedEncodingException, KeeperException, InterruptedException {
        Stat stat = zookeeper.exists(rootPath, false);
        
        if (stat != null) {
            return;
        } else {
            try {
                zookeeper.create(rootPath, 
                        rootPath.getBytes("UTF-8"), 
                        Ids.OPEN_ACL_UNSAFE, 
                        CreateMode.PERSISTENT);
            } catch (KeeperException.NodeExistsException e) {
                return;
            }
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读