两小时玩转分布式锁

2020-04-22  本文已影响0人  吗丁啉要餐前吃

前言

在实际开发过程中,当遇到高并发的场景时,我们通常会使用锁来保证线程安全。例如ReentrantLock,Synchronized。但是,熏弟,2020年了,应用都开始分布式了。普通的锁已经无法满足我们对程序加锁的欲望了。那么,如何解决呢?
首先,我们先要搞懂“锁”的基本原理。ReentrantLock是怎么实现锁的呢?(暂时不考虑AQS队列,只写最基础的方法。)

锁的基本流程.png
看完以后,是不是觉得so easy?关键就在于这个state,在jdk的lock中,可以使用volite关键字来进行状态的维护。那么我们,首先就可以考虑如何在分布式环境中维护这个状态。想想我们日常用到的技术中,哪些地方有这样的唯一性约束条件。聪明的你,有没有一些大胆的想法?mysql的主键索引,正好满足这个要求。
废话不多说,开整。

代码实例

  1. 首先,作为一名有逼格的程序员,我们得面向接口编程,先上接口。
public interface Lock {
    //获取锁
    void getLock();
    //释放锁
    void unLock();
}

2.第二步,用一个抽象类,构建锁的基本流程(模板方法模式)

public abstract class AbstractLock implements Lock {

    @Override
    public void getLock() {
        //竞争锁
        if(tryLock()){
            System.out.println("获取Lock锁资源");
        }else {
            //任务阻塞
            waitLock();
            //重新获取锁
            getLock();
        }
    }

    //占有锁
    protected abstract  boolean tryLock();
    //等待锁
    protected abstract void waitLock();
}

接下来,就是对锁的实现了。
其实除了利用mysql外,我们还可以利用redis的setnx特性,zookeeper的临时节点和监听机制来实现。那我门就一个一个来吧。
3.锁的实现
(1)Mysql实现分布式锁

public class MysqlLock  extends AbstractLock{
    @Autowired
    private JdbcTemplate jdbcTemplate;
    private static final int LOCK_ID=1;

    @Override
    protected boolean tryLock() {
        try {
            jdbcTemplate.update("insert into mlock values (?)",LOCK_ID);//定义mlock表,只有一个id主键字段
        }catch (Exception e){
            return false;
        }
        return true;
    }

    @Override
    protected void waitLock() {
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void unLock() {
        jdbcTemplate.update("delete from mlock where id=?",LOCK_ID);
    }
}

(2)redis实现分布式锁

public class RedisLock extends AbstractLock {
    @Autowired
    private JedisConnectionFactory factory;

    public static final String KEY = "LOCK_KEY";
    private ThreadLocal<String> local = new ThreadLocal<>();

    @Override
    protected boolean tryLock() {
        String uuid = UUID.randomUUID().toString();//定义uuid,避免解锁的时候把别人的锁解了
        Jedis jedis = (Jedis) factory.getConnection().
        String res = jedis.set(KEY, uuid, "NX", "PX", 10000);getNativeConnection();//设置临时节点,并设置过期时间为10s,避免死锁
        if("OK".equals(res)){
            local.set(uuid);
            return true;
        }
        return false;
    }

    @Override
    protected void waitLock() {
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
/**
     * 使用lua脚本进行删除操作,保证了get和del两步操作的原子性
     * */
    @Override
    public void unLock() {
        //读取lua脚本
        String script = FileUtils.readFile("unlock.lua");
        //获取redis的原始连接
        Jedis jedis = (Jedis) factory.getConnection().getNativeConnection();
        //通过原始连接连接redis执行脚本
        jedis.eval(script, Arrays.asList(KEY),Arrays.asList(local.get()));
    }
}

lua脚本

if redis.call("get",KEYS[1]) == ARGV[1] then 
    return redis.call("del",KEYS[1]) 
else 
    return 0 
end

(3)zookeeper实现分布式锁
第一种实现方式

public class ZkLock extends AbstractLock {

    private final static String PATH="/lock";

    // zk连接地址
    private static final String CONNECTSTRING = "127.0.0.1:2181";
    // 创建zk连接
    protected ZkClient zkClient = new ZkClient(CONNECTSTRING);

    private CountDownLatch countDownLatch;

    @Override
    protected boolean tryLock() {
        try {
            /**创建临时节点*/
            zkClient.createEphemeral(PATH);
        } catch (RuntimeException e) {
            return false;
        }
        return true;
    }

    @Override
    protected void waitLock() {
        /**监听数据变化*/
        IZkDataListener iZkDataListener = new IZkDataListener(){
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                /** 节点删除,唤醒被等待的线程*/
                if(countDownLatch!=null){
                    countDownLatch.countDown();
                }
            }
        };
        /**PATH 节点订阅监听器*/
        zkClient.subscribeDataChanges(PATH,iZkDataListener);
        if(zkClient.exists(PATH)){
            countDownLatch=new CountDownLatch(1);
            try {
                /**  阻塞,一直等到接收到事件通知*/
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(PATH,iZkDataListener);
    }

    @Override
    public void unLock() {
        if(zkClient!=null){
            zkClient.delete(PATH);
            zkClient.close();
            System.out.println("释放锁资源");
        }

    }
}

这种实现方式会存在"惊群效应",当锁被释放的时候,其他的进程会一起去抢这把锁,可能导致有的进程永远无法获得到锁。那么,我们是不是可以像公平锁那样,让进程排队,挨个去用这把锁呢。zookeeper刚好有有序节点功能,我们可以利用。
第二种实现方式

public class ZkLock2 extends AbstractLock {
    private static final String PATH2 = "/lock2";

    // zk连接地址
    private static final String CONNECTSTRING = "127.0.0.1:2181";
    // 创建zk连接
    protected ZkClient zkClient = new ZkClient(CONNECTSTRING);

    private CountDownLatch countDownLatch= null;

    private String beforePath;//当前请求的节点前一个节点
    private String currentPath;//当前请求的节点

    public ZkLock2() {
        if (!this.zkClient.exists(PATH2)) {
            this.zkClient.createPersistent(PATH2);
        }
    }

    @Override
    public boolean  tryLock() {
        //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
        if(currentPath == null || currentPath.length()<= 0){
            //创建一个临时顺序节点
            currentPath = this.zkClient.createEphemeralSequential(PATH2 + '/',"lock");
        }
        //获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
        List<String> childrens = this.zkClient.getChildren(PATH2);
        Collections.sort(childrens);

        if (currentPath.equals(PATH2 + '/'+childrens.get(0))) {//如果当前节点在所有节点中排名第一则获取锁成功
            return true;
        } else {//如果当前节点在所有节点中排名中不是排名第一,则获取前面的节点名称,并赋值给beforePath
            int wz = Collections.binarySearch(childrens,
                    currentPath.substring(7));
            beforePath = PATH2 + '/'+childrens.get(wz-1);
        }
        return false;

    }

    @Override
    public void waitLock() {
        IZkDataListener listener = new IZkDataListener() {

            public void handleDataDeleted(String dataPath) throws Exception {

                if(countDownLatch!=null){
                    countDownLatch.countDown();
                }
            }

            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };
        //给排在前面的的节点增加数据删除的watcher,本质是启动另外一个线程去监听前置节点
        this.zkClient.subscribeDataChanges(beforePath, listener);

        if(this.zkClient.exists(beforePath)){
            countDownLatch=new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.zkClient.unsubscribeDataChanges(beforePath, listener);
    }

    @Override
    public void unLock() {
        //删除当前临时节点
        zkClient.delete(currentPath);
        zkClient.close();
        System.out.println("释放锁");
    }
}

总结

以上就是三种分布式锁的实现了,三种锁中,mysql和redis会存在死锁的可能,我建议大家使用zookeeper的实现。

上一篇 下一篇

猜你喜欢

热点阅读