ZK实现的分布式锁

2020-06-06  本文已影响0人  悠扬前奏

工作需要。写了一个基于ZK的分布式锁,记录一下:

原理

zk能保证集群上的路径同一时刻只有一个客户端来创建。因此,通过在集群上顺序创建和删除临时路径,在实现分布式锁的获取和释放。

代码

zk上有一个客户端框架Curator已经对分布式互斥锁进行了封装,几乎是开箱即用:

public class ZKCuratorManager {
    private static InterProcessMutex lock;
    private static CuratorFramework cf;
    private static String zkAddr = "*.*.*.*:2181";
    private static String lockPath = "/distribute-lock";

    static {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        cf = CuratorFrameworkFactory.builder()
                .connectString(zkAddr)
                .sessionTimeoutMs(2000)
                .retryPolicy(retryPolicy)
                .build();
        cf.start();
    }

    public static InterProcessMutex getLock() {
        lock = new InterProcessMutex(cf, lockPath);
        return lock;
    }
}
public class ZKCuratorLockUtil {

    /**
     * 从配置类中获取分布式锁对象
     */
    private static InterProcessMutex lock = ZKCuratorManager.getLock();

    /**
     * 加锁
     *
     * @return
     */
    public static boolean acquire() {
        try {
            lock.acquire();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 锁的释放
     */
    public static void release() {
        try {
            lock.release();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class ZkLockTest {

    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier = new CyclicBarrier(N);
        for (int i = 0; i < N; i++) {

            new WriterTest(barrier).start();
        }

        System.out.println("END");
    }

    static class WriterTest extends Thread {
        private CyclicBarrier cyclicBarrier;

        public WriterTest(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
            try {
                //以睡眠来模拟写入数据操作
                Thread.sleep(5000);
                System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
            //加锁
            ZKCuratorLockUtil.acquire();
            System.out.println("线程" + Thread.currentThread().getName() + "获得分布式锁");
            try {
                Thread.sleep(2000);
                ZKCuratorLockUtil.release();
                System.out.println("线程" + Thread.currentThread().getName() + "释放分布式锁");
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("END");
        }
    }
}
线程Thread-0正在写入数据...
线程Thread-1正在写入数据...
END
线程Thread-2正在写入数据...
线程Thread-3正在写入数据...
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
线程Thread-3获得分布式锁
线程Thread-3释放分布式锁
END
线程Thread-1获得分布式锁
线程Thread-1释放分布式锁
END
线程Thread-2获得分布式锁
线程Thread-2释放分布式锁
END
线程Thread-0获得分布式锁
线程Thread-0释放分布式锁
END
上一篇下一篇

猜你喜欢

热点阅读