Java 杂谈zookeeper面试专题

分布式锁之Zookeeper实现

2019-02-15  本文已影响9人  清幽之地

年前临放假我们一起学习了在Redis中如何实现分布式锁,过年期间在家里好吃好喝好玩完毕,现在,该继续学习啦!

不知诸位还是否记得上次我们说的《沙滩 - 脚印》那个例子,在Zookeeper中,实现分布式锁原理也差不多。如果你不知道,快回头先看看分布式锁之Redis实现

如果您对zookeeper还不熟悉,需要先去了解相关背景知识。

一、Zookeeper特性

在开始之前,我们重温一下zookeeper中的一些概念性知识。

1、数据节点

zookeeper的视图结构和文件系统类似,存储于内存。其中, 每个节点称为数据节点znode。每个znode可以存储数据,也可以挂载子节点。

比如在Dubbo中,我们将服务的信息注册到zookeeper中。就是由一个个的节点和子节点组成。

[zk: localhost:2181(CONNECTED) 1] ls /dubbo/com.viewscenes.netsupervisor.service.InfoUserService 
[consumers, configurators, routers, providers]
[zk: localhost:2181(CONNECTED) 2] 

或者,我们将它理解为Windows系统中的文件夹,意思差不多的。

2、Watcher

Watcher(事件监听器),我们可以注册watcher监控znode的变化。比如znode删除、数据发生变化、子节点发生变化等。当触发这些事件时,客户端就会得到通知。
Watcher机制是Zookeeper实现分布式协调服务的重要特性。

3、节点类型

在zookeeper中,数据节点有着不同的类型。

持久节点,一旦被创建,除非主动进行删除操作,否则这个节点会一直保持。
临时节点,与客户端session绑定,一旦session失效,节点就会被自动删除。

有个重要的信息是,对于持久节点和临时节点,同一个znode下,节点的名称是唯一的! 就像在windows中,我们不可能在同一目录,创建两个相同名字的文件夹。记住它,这个实现分布式锁的基础。

二、实现

基于上面zookeeper的特性,对于分布式锁,我们就可以梳理出这样一条思路。

1、加锁,申请创建临时节点。
2、创建成功,则加锁成功;完成自己的业务逻辑后,删除此节点,释放锁。
3、创建失败,则证明节点已存在,当前锁被别人持有;注册watcher,监听数据节点的变化。
4、监听到数据节点被删除后,证明锁已被释放。重复步骤1,再次尝试加锁。

1、初始化

在构造方法中,我们先将zookeeper的客户端和锁的节点路径设置一下。

public class ZookeeperLock implements Lock {

    Logger logger = LoggerFactory.getLogger(this.getClass());
    private String root_path = "/zookeeper";
    private String current_path;
    private ZooKeeper zooKeeper;

    public ZookeeperLock(String lock_name,ZooKeeper zooKeeper){
        current_path = root_path+"/"+lock_name;
        this.zooKeeper = zooKeeper;
    }
}

2、加锁

上面我们说了,加锁就是创建节点的过程。代码也很简单。

public class ZookeeperLock implements Lock {
 
    /**
     * 加锁
     * 失败后调用waitForRelease方法等待。
     */
    public void lock() {
        if (tryLock()){
            logger.info("获取锁成功!");
        }else {
            waitForRelease();
        }
    }
    /**
     * 尝试加锁
     * 创建临时节点,创建成功则加锁成功,返回true
     * @return
     */
    public boolean tryLock() {
        try {
            zooKeeper.create(current_path, "1".getBytes(), 
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            logger.info("创建节点成功!");
            return true;
        } catch (Exception e) {
            logger.error("创建节点失败!{}",e.getMessage());
        }
        return false;
    }
}

3、等待锁释放

waitForRelease就是等待锁释放的方法。这里先判断一次锁的数据节点是否还存在,如果不存在,再次调用lock方法尝试加锁;如果存在,则通过countDownLatch来等待,直到触发NodeDeleted事件。

public class ZookeeperLock implements Lock {
 
    /**
     * 注册Watcher 监听znode节点删除事件
     */
    private void waitForRelease(){
        CountDownLatch countDownLatch = new CountDownLatch(1);
        
        Watcher watcher = watchedEvent -> {
            Watcher.Event.EventType type = watchedEvent.getType();
            //触发NodeDeleted事件,跳过await方法
            if (Watcher.Event.EventType.NodeDeleted.equals(type)){
                countDownLatch.countDown();
            }
        };
        try {
            //判断当前锁的数据节点是否存在
            Stat exists = zooKeeper.exists(current_path, watcher);
            if (exists==null){
                lock();
            }else {
                countDownLatch.await();
                lock();
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

4、释放锁

我们把锁的数据节点删除之后,就释放了锁。

public class ZookeeperLock implements Lock {
    public void unlock() {
        try {
            zooKeeper.delete(current_path,-1);
        } catch (Exception e) {
            logger.error("删除节点失败:{}",e.getMessage());
        }
        logger.info("释放锁成功!");
    }
}

5、测试

public class ZkTest1 {

    static final Logger logger = LoggerFactory.getLogger(ZkTest1.class);
    static final int thread_count = 100;
    static int num = 0;

    public static void main(String[] args) throws IOException, InterruptedException {

        CountDownLatch c1 = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper("192.168.139.131:2181", 99999, watchedEvent -> {
            if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                c1.countDown();
            }
        });
        c1.await();

        long start = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(thread_count);
        CountDownLatch countDownLatch = new CountDownLatch(thread_count);
        for (int i=0;i<thread_count;i++){
            executorService.execute(() -> {
                Lock lock = new ZookeeperLock("lock",zooKeeper);
                try {
                    lock.lock();
                    num++;
                }finally {
                    if (lock!=null){
                        lock.unlock();
                    }
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        long end = System.currentTimeMillis();
        logger.info("线程数量为:{},统计NUM数值为:{},共耗时:{}",thread_count,num,(end-start));
        executorService.shutdown();
    }
}

大家可以运行测试一下,重点可以看看输出的日志,来帮助我们理解整个代码流程。这种方式有很大的性能问题,当请求数高了之后,会变得非常非常慢。

15:07:51.713 [main] INFO  - 线程数量为:100,统计NUM数值为:100,共耗时:1625
15:07:25.056 [main] INFO  - 线程数量为:1000,统计NUM数值为:1000,共耗时:125062

如上,在笔者虚拟机环境中的测试结果。问题出在哪呢?

三、改进版实现

我们说上面的代码有较大的性能问题,事实上造成这种问题的原因,有个专业名词:惊群效应。

惊群问题是计算机科学中,当许多进程等待一个事件,事件发生后这些进程被唤醒,但只有一个进程能获得CPU执行权,其他进程又得被阻塞,这造成了严重的系统上下文切换代价。

就好比,一只凶恶的大灰狼,跑进羊群。虽然一次只会有一只羊被吃掉,但可爱的羊儿们为了保住自己的小命,都会四处奔逃。没有被吃掉的羊儿,继续埋头吃草...直到下一只狼的到来,历史总是惊人的相似。

回到我们上面的代码中,多个线程都会注册Watcher事件,等待锁释放;当触发数据节点删除事件,线程被唤醒,然后争先抢后的尝试加锁。只有一个线程加锁成功,其余的线程继续阻塞,等待唤醒...

我们怎么改进这个问题呢?

1、临时顺序节点

上面我们说zookeeper数据节点分为4个类型,其中有一个就是临时顺序节点。

首先,它是一个临时节点;其次,它的节点名称是有顺序的。比如,我们在/lock节点创建几个临时顺序节点,它看起来是这样的:

[zk: localhost:2181(CONNECTED) 2] create -s -e /lock/test 1234
Created /lock/test0000000230
[zk: localhost:2181(CONNECTED) 3] create -s -e /lock/test 1234
Created /lock/test0000000231
[zk: localhost:2181(CONNECTED) 4] create -s -e /lock/test 1234
Created /lock/test0000000232
[zk: localhost:2181(CONNECTED) 5] create -s -e /lock/test 1234
Created /lock/test0000000233
[zk: localhost:2181(CONNECTED) 6] create -s -e /lock/test 1234
Created /lock/test0000000234
[zk: localhost:2181(CONNECTED) 7] create -s -e /lock/test 1234
Created /lock/test0000000235
[zk: localhost:2181(CONNECTED) 8] ls /lock
[test0000000230, test0000000231, test0000000232, test0000000233, test0000000234, test0000000235]
[zk: localhost:2181(CONNECTED) 9] 

可以看到,我们创建的/lock/test节点,都被加上了一个自增的序号。比如test0000000230、test0000000231这样。这个自增序号是zookeeper内部机制保证的,我们暂且不用管它怎么生成。

2、实现原理

基于zookeeper临时顺序节点的特性,针对惊群效应,业界又改进了一种实现方法。它的思路是这样的:

1、加锁,在/lock锁节点下创建临时顺序节点并返回,比如:test0000000235
2、获取/lock节点下全部子节点,并排序。
3、判断当前线程创建的节点,是否在全部子节点中顺序最小。
4、如果是,则代表获取锁成功;完成自己的业务逻辑后释放锁。
5、如果不是最小,找到比自己小一位的节点,比如test0000000234;对它进行监听。
6、当上一个节点删除后,证明前面的客户端已释放锁;然后尝试去加锁,重复以上步骤。

3、实现

初始化

public class ZkClientLock implements Lock {

    private Logger logger = LoggerFactory.getLogger(ZkClientLock.class);
    private String lock_path = "/lock";
    private ZkClient client;
    private CountDownLatch countDownLatch;

    private String beforePath;// 当前请求的节点前一个节点
    private String currentPath;// 当前请求的节点

    public ZkClientLock(ZkClient client){
        this.client = client;
    }
}

加锁

我们看加锁的过程,主要是判断自己是否处在全部子节点的第一位,是的话加锁成功;否则找到自己的上一个节点,调用waitForRelease方法等待锁释放。

public class ZkClientLock implements Lock {

    /**
     * 加锁
     * 如未成功获取锁,则等待锁释放后再次尝试加锁
     */
    public void lock() {
        if (tryLock()){
            logger.info("获取锁成功");
        }else {
            waitForRelease();
            lock();
        }
    }
    /**
     * 当前节点排在全部子节点的第一位,则加锁成功
     * 否则找出前一个节点
     * @return
     */
    public boolean tryLock() {
        if (currentPath==null || currentPath.length()==0){
            currentPath = client.createEphemeralSequential(lock_path+"/","lock");
            logger.info("当前锁路径为:{}",currentPath);
        }
        //获取全部子节点并排序
        List<String> children = client.getChildren(lock_path);
        Collections.sort(children);

        //当前节点如果排在第一位,返回成功
        if (currentPath.equals(lock_path+"/"+children.get(0))){
            return true;
        }else {
            //找出上一个节点
            String sequenceNodeName = currentPath.substring(lock_path.length()+1);
            int i = Collections.binarySearch(children, sequenceNodeName);
            beforePath = lock_path + '/' + children.get(i - 1);
        }
        return false;
    }
}

等待锁释放

这里就是对上一个节点进行监听,节点被删除后,方法返回。

public class ZkClientLock implements Lock {

    /**
     * 等待锁释放
     * 对上一个节点进行监听,节点删除后返回
     */
    private void waitForRelease(){

        IZkDataListener listener = new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }
            public void handleDataChange(String dataPath, Object data) throws Exception {}
        };
        this.client.subscribeDataChanges(beforePath, listener);
        if (this.client.exists(beforePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.client.unsubscribeDataChanges(beforePath, listener);
    }
}

释放锁

public void unlock() {
    client.delete(currentPath);
    logger.info("释放锁成功");
}

这种方式改进之后,性能会得到大幅度提升。同样的测试方法,得到结果如下:

15:19:19.327 [main] INFO  - 线程数量为:100,统计NUM数值为:100,共耗时:636
15:18:58.728 [main] INFO  - 线程数量为:1000,统计NUM数值为:1000,共耗时:5398

四、Curator

我们上面写的两个锁实现,并不能用于生产环境中。还需要考虑很多细节才行,比如锁的可重入性、锁超时。
Curator是什么,想必不用再介绍。如果在生产环境中,使用到zookeeper分布式锁,笔者推荐使用开源组件,除非自己写的比人家的要好,哈哈。
首先,引入它的Maven坐标。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
</dependency>

1、用法

Curator帮我们封装了zookeeper分布式锁的实现逻辑,用起来非常简单。

首先,连接到zookeeper客户端。然后创建一个互斥锁的实例,调用即可。

public static void main(String[] args) throws Exception {

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client =
            CuratorFrameworkFactory.newClient(
                    "192.168.139.131:2181",
                    999999,
                    3000,
                    retryPolicy);
    client.start();
    
    InterProcessMutex lock = new InterProcessMutex(client, "/lock");
    //加锁
    lock.acquire();
    //解锁
    lock.release();
}

用同样的测试方法,得到结果如下:

15:43:34.306 [main] INFO  - 线程数量为:100,统计NUM数值为:100,共耗时:606
15:43:02.427 [main] INFO  - 线程数量为:1000,统计NUM数值为:1000,共耗时:6785

2、InterProcessMutex

我们先看下InterProcessMutex类有哪些属性。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {

    //内部锁的实例对象
    private final LockInternals internals;
    //锁的基本路径
    private final String basePath;
    //线程和锁的映射
    private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;
    //锁的名称前缀
    private static final String LOCK_NAME = "lock-";
}

这里的重点是threadData,它是一个ConcurrentMap对象,保存着当前线程和锁的映射关系,是实现可重入锁的基础。我们再看下LockData这个类。

private static class LockData {

    //当前线程
    final Thread owningThread;
    //当前锁的节点
    final String lockPath;
    //锁的次数
    final AtomicInteger lockCount;
    private LockData(Thread owningThread, String lockPath) {
        this.lockCount = new AtomicInteger(1);
        this.owningThread = owningThread;
        this.lockPath = lockPath;
    }
}

接着再看InterProcessMutex的构造方法。

public InterProcessMutex(CuratorFramework client, String path) {
    this(client, path, new StandardLockInternalsDriver());
}

StandardLockInternalsDriver是锁的驱动类,它主要就是创建锁的节点和判断当前节点是不是处于第1 位。接着看,就是初始化一些属性。

InterProcessMutex(CuratorFramework client, String path, String lockName, 
                int maxLeases, LockInternalsDriver driver) {
                
    this.threadData = Maps.newConcurrentMap();
    this.basePath = PathUtils.validatePath(path);
    this.internals = new LockInternals(client, driver, path, lockName, maxLeases);
}

3、加锁

加锁的方式有两种,有超时时间的和不带超时时间的。

lock.acquire();
lock.acquire(5000, TimeUnit.SECONDS);

不过没关系, 它们都会调用到internalLock方法。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
        
    //加锁
    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        
        //当前线程
        Thread currentThread = Thread.currentThread();
        InterProcessMutex.LockData lockData = this.threadData.get(currentThread);
        //当前线程有锁的信息,锁次数加1,返回
        if (lockData != null) {
            lockData.lockCount.incrementAndGet();
            return true;
        } else {
            //尝试加锁,返回当前锁的节点路径
            String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
            if (lockPath != null) {
                //将当前线程和锁的关系缓存到lockData
                InterProcessMutex.LockData newLockData = 
                            new InterProcessMutex.LockData(currentThread, lockPath);
                this.threadData.put(currentThread, newLockData);
                return true;
            } else {
                return false;
            }
        }
    }
}

以上代码我们分为两部分来看。

然后我们接着看attemptLock方法,它是如何尝试加锁的。

public class LockInternals {

    //尝试加锁
    //如果加锁成功,则返回当前锁的节点路径
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        //方法执行开始时间
        long startMillis = System.currentTimeMillis();
        //锁超时时间
        Long millisToWait = unit != null ? unit.toMillis(time) : null;
        //锁节点数据
        byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
        int retryCount = 0; 
        //是否已经持有锁
        boolean hasTheLock = false;
        boolean isDone = false;
        while(!isDone) {
            isDone = true;      
            try {
                //创建锁
                ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
                //加锁
                hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (NoNodeException var14) {
                if (!this.client.getZookeeperClient().getRetryPolicy().
                        allowRetry(retryCount++, System.currentTimeMillis() - startMillis, 
                            RetryLoop.getDefaultRetrySleeper())) {
                    throw var14;
                }

                isDone = false;
            }
        }
        return hasTheLock ? ourPath : null;
    }
}

以上代码看着多,其实也不复杂,如果已经持有锁,就返回锁的节点路径。我们重点看两个地方。

创建锁就是在zookeeper中创建临时顺序节点,返回锁的节点名称。

public class StandardLockInternalsDriver implements LockInternalsDriver {
    
    //创建锁
    public String createsTheLock(CuratorFramework client, 
                String path, byte[] lockNodeBytes) throws Exception {
        String ourPath;
        if (lockNodeBytes != null) {
            //lockNodeBytes默认为空
        } else {
            ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().
                creatingParentContainersIfNeeded().
                withProtection().
                withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).
                forPath(path);
        }
        return ourPath;
    }
}

创建后锁的节点路径 = UUID + lock + zookeeper自增序列。它看起来是这样的:
_c_d5815293-f5b1-484d-b789-9f11df69149c-lock-0000006168

创建锁的节点之后,不断的循环去加锁,直到加锁成功或者锁超时退出。

public class LockInternals {
    
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        //是否已经持有锁
        boolean haveTheLock = false;
        boolean doDelete = false;
        try {
            if (this.revocable.get() != null) {
                ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
            }
            //如果没有持有锁,就一直循环
            while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {    
                //获取所有子节点并排序
                List<String> children = this.getSortedChildren();
                //获取当前锁的自增序列
                String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
                //判断当前锁是否处于第1位
                PredicateResults predicateResults = this.driver.getsTheLock(this.client, 
                                                    children, sequenceNodeName, this.maxLeases);
                                                    
                //处于第1位,返回true
                if (predicateResults.getsTheLock()) {
                    haveTheLock = true;
                } else {
                    //获取上一个序列路径
                    String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
                    synchronized(this) {
                        try {
                            //对上一个序列路径进行监听,并等待
                            ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                            if (millisToWait == null) {
                                this.wait();
                            } else {
                                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if (millisToWait > 0L) {
                                    this.wait(millisToWait);
                                } else {
                                    doDelete = true;
                                    break;
                                }
                            }
                        } catch (NoNodeException var19) {
                        }
                    }
                }
            }
        } catch (Exception var21) {
            ThreadUtils.checkInterrupted(var21);
            doDelete = true;
            throw var21;
        } finally {
            if (doDelete) {
                this.deleteOurPath(ourPath);
            }

        }
        return haveTheLock;
    }
}

以上代码比较长,但逻辑比较清晰。我们重点看while循环内的代码。

4、解锁

既然是可重入锁,解锁的时候必然先判断锁的重入次数。当次数为0时,删除zookeeper中的锁节点信息等。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {

    //解锁
    public void release() throws Exception {
        
        //获取当前线程的锁相关信息lockData
        Thread currentThread = Thread.currentThread();
        InterProcessMutex.LockData lockData = this.threadData.get(currentThread);
        if (lockData == null) {
            throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
        } else {
            //锁次数递减1
            int newLockCount = lockData.lockCount.decrementAndGet();
            if (newLockCount <= 0) {
                if (newLockCount < 0) {
                    throw new IllegalMonitorStateException("....");
                } else {
                    try {
                        //移除Watch、删除锁节点、移除线程和锁的映射关系 
                        this.client.removeWatchers();
                        this.revocable.set((Object)null);
                        this.deleteOurPath(lockPath);
                    } finally {
                        this.threadData.remove(currentThread);
                    }

                }
            }
        }
    }
}

至此,Curator中关于互斥锁的实现逻辑我们已经分析完了。可以看到,它的实现原理跟我们自己改进后的代码实现基本上是相同的。不过,多了锁的可重入和锁超时。

上一篇下一篇

猜你喜欢

热点阅读