分布式锁

2019-10-08  本文已影响0人  Audience0

实现分布式锁的方式
一.基于数据库实现分布式锁
1、在数据库新建一张锁表(lock)
create table lock (
id
method_name (唯一约束)
.......
)
2、获取锁时向表中插入一条数据,由于有唯一约束,只会有一个线程插入成功,插入成功的线程获得锁,可以继续操作,没有插入成功的线程没有获得锁,不能操作;
3、解锁时删除该条数据;
主要问题:
可用性差,数据库故障会导致业务系统不可用;
数据库性能存在瓶颈,不适合高并发场景;
删除锁失败容易造成死锁;
锁的失效时间难以控制;

第二种:基于Redis实现分布式锁
加锁:
setnx命令加锁,并设置锁的有效时间和持有人标识;
expire命令设置锁的过期时间;
解锁:
检查是否持有锁,然后删除锁;
delete命令删除锁
在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。
基于Redis实现分布式锁,采用一个开源项目http://redisson.org

第三种:基于zookeeper实现分布式锁
zookeeper是分布式系统的协调服务,提供配置管理、分布式协同、命名的中心化服务、服务注册发现等;
zookeeper实现分布式锁采用其提供的临时有序节点+监听来实现;
Curator客户端给我们提供了现成的分布式互斥锁来实现分布式锁;
1、创建分布式互斥锁
InterProcessMutex lock = new InterProcessMutex(zookeeperCuratorClient.client, "/storeLock");

2、获取分布式互斥锁
if (lock.acquire(10, TimeUnit.SECONDS))
3、释放分布式互斥锁
lock.release();


curator客户端分布式锁


public class lockService {
    @Autowired
    private ZookeeperDao dao;
    InterProcessMutex lock = new InterProcessMutex(new CuratorClient().getClient(),"/lock");

    public  void loclMethod(){

        try {
            //获取锁
            //一直等待锁
            //lock.acquire();
            //尝试获取锁,如果在指定时间获取锁,则返回true
            if (lock.acquire(1000, TimeUnit.SECONDS)){
                int check = dao.check();
                if (check > 0){
                    System.out.println("售出");
                    dao.des(--check);
                }else {
                    System.out.println("没有库存");
                }
            }

            //Thread.sleep(50);
        } catch (Exception e) {
            System.out.println(e);
        }finally {
            try {
                //释放锁
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }
}

zookeeper原生客户端分布式锁实现

/**
 * zookeeper原生客户端实现分布式锁
 * 基于有序临时节点
 */
public class ZookeeperLock {

    private ZooKeeper zooKeeper;
    private static final String ZK_ADDRESS = "127.0.0.1";
    //锁根节点
    private static String rootName = "/zklock";
    //锁节点
    private String lockName;
    //当前锁节点名称
    private String cuurentLockName;
    //zookeeper 超时时间
    private int sessionTimeout = 100000;

    private CountDownLatch countDownLatch = new CountDownLatch(1);
    public ZookeeperLock(String lockName) {
        //锁节点名称 初始化
        this.lockName = lockName;
        //zookeeper初始化
        try{
            zooKeeper  = new ZooKeeper(ZK_ADDRESS, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {

                    if (event.getState() == Event.KeeperState.SyncConnected){
                        //zookeeper连接上了
                        countDownLatch.countDown();
                    }
                }
            });
            //等待zookeeper连接成功
            countDownLatch.await();

            //创建一个根节点/lock
            if (zooKeeper.exists(rootName,false) == null){
                zooKeeper.create(rootName,"rootLock".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }



        }catch (Exception e){
            throw new RuntimeException(e);
        }
    }

    /**
     * zookeeper加锁
     * 获取分布式锁
     */
    public void lock(){
        try {
            String node = zooKeeper.create(rootName + "/" + lockName,
                    "lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            //拿到根节点下的子节点  临时有序的子节点
            List<String> children = zooKeeper.getChildren(rootName, false);
            TreeSet<String> childNodes = new TreeSet<>();
            //childNodes.addAll(children);
            for (String c : children){
                childNodes.add(rootName+ "/" +c);
            }
            //排好顺序的节点,取出最小的
            String minNode = childNodes.first();
            //获取指定节点的前一个节点
            String preNode = childNodes.lower(node);

            //判断是不是最小节点
            if (node.equals(minNode)) {
                //当前进来的这个线程锁创建的节点就是分布式锁节点
                 cuurentLockName = node;
                 return;
            }
            //用于阻塞未获取锁线程
            CountDownLatch countDownLatch = new CountDownLatch(1);


            //其他线程没拿到锁
            if (null != preNode){
                //如果前一个节点不为null 则去监听她
                Stat exists = zooKeeper.exists(preNode, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getType() == Event.EventType.NodeDeleted){
                            //监听到前一个节点的删除事件,则到计数器减一
                            countDownLatch.countDown();
                        }
                    }
                });
                if (exists != null){
                    //等待获取锁
                    countDownLatch.await();
                    //当前前一个节点删除,到计数器减一之后,当前线程获取锁
                    cuurentLockName = node;
                }

            }

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    /**
     * zookeeper解锁
     * 主要做的就是删除当前锁节点删除
     */
    public void unlock(){

        try{
            if (StringUtils.isNotBlank(cuurentLockName)){
                zooKeeper.delete(cuurentLockName,-1);
            }
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    }

}

zkClient实现分布式锁

/**
 * zkClient客户端实现分布式锁
 * 基于有序临时节点
 */
public class ZkClientLock {

    private ZkClient zooKeeper;
    private static final String ZK_ADDRESS = "127.0.0.1";
    //锁根节点
    private static String rootName = "/zklock";
    //锁节点
    private String lockName;
    //当前锁节点名称
    private String cuurentLockName;
    //zookeeper 超时时间
    private int sessionTimeout = 100000;

    /**
     * 创建连接
     * @param lockName
     */
    public ZkClientLock(String lockName) {
        this.lockName = lockName;

        try{
            zooKeeper = new ZkClient(ZK_ADDRESS);

            //创建一个根节点/lock
            if (!zooKeeper.exists(rootName)){
                zooKeeper.create(rootName,"rootLock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    }

    /**
     * 加锁
     */
    public void lock(){

        String node = zooKeeper.create(rootName + "/" + lockName,
                "zkclient".getBytes(), CreateMode.PERSISTENT_SEQUENTIAL);

        //获取所有子节点
        List<String> subNodes = zooKeeper.getChildren(rootName);
        TreeSet<String> sortNodes = new TreeSet<>();
        for (String subNode : subNodes){
            sortNodes.add(rootName + "/" +subNode);
        }
        //获取最小节点
        String firstNode = sortNodes.first();
        //获取前一个节点
        String preNode = sortNodes.lower(node);

        if (firstNode.equals(node)){
            //则当前节点为锁节点
            cuurentLockName= node;
            return;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        if (StringUtils.isNotBlank(preNode)){
            //说明非锁节点,需要等待,并监听前一个节点
            boolean exists = zooKeeper.exists(preNode);
            if (exists){
                //监听前一个节点
                zooKeeper.subscribeDataChanges(preNode, new IZkDataListener() {
                    @Override
                    public void handleDataChange(String dataPath, Object data) throws Exception {

                    }

                    @Override
                    public void handleDataDeleted(String dataPath) throws Exception {
                        countDownLatch.countDown();
                    }
                });

                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                cuurentLockName = node;
            }

        }

    }
    /**
     * zookeeper解锁
     * 主要做的就是删除当前锁节点删除
     */
    public void unlock(){

        try{
            if (StringUtils.isNotBlank(cuurentLockName)){
                zooKeeper.delete(cuurentLockName,-1);
            }
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读