浅谈分布式锁

2019-10-22  本文已影响0人  周若谷

为什么要有分布式锁

随着架构系统的演进,由纯真的单机架构到容器化编排的分布式架构,可以说是一个大型互联网企业发展的必然走向。在网站初创时,应用数量和用户较少,可以把 Tomcat 和Mysql 部署在同一台机器上。随着用户数量增多,访问量增大,并发升高,Tomcat 和 MySQL 竞争资源,此时,单机已经扛不住了,需要把 Tomcat 和 MySQL 分离在不同的机器上,用于提升单台机器的处理能力。业务从来没有减少,产品越做越大。应用也越来越复杂,原来的大应用,拆分成多个小应用,加入各级缓存,做了反向代理负责均衡,最后坠入分库分表的深渊。

微服务渐渐代替了庞大冗杂的服务,每个小服务,各司其职。这时候是不是就不存在资源竞争的问题了呢?答案毋庸置疑,在架构的演进过程中,无时无刻都存在着资源竞争的问题。

说起资源竞争的问题,是不是想起了在计算机科学中的一个经典问题——哲学家就餐,也就是在并行计算中多线程同步( Synchronization )时产生的问题?哲学家就餐问题用来解释死锁和资源耗尽的问题,我们不做详细的讨论,感兴趣的同学可以搜索资料了解。既然存在资源竞争的问题,解决的方案必然是对资源加锁,对于锁大家肯定不陌生,在 Java 中synchronized 关键字和 ReentrantLock 可重入锁在我们的代码或者一些开源代码中随处可见的,一般用于在本地多线程环境中控制对资源的并发访问。但是随着微服务架构的蓬勃兴起,分布式的快速发展,本地加锁已经不能满足我们的业务需求,如果还通过本地加锁的方式锁定资源,在分布式环境中是无用的。于是人们为了在分布式环境中也能实现本地锁的效果,也是纷纷各出其招。

Martin Kleppmann 是英国剑桥大学的分布式系统的研究员,之前和 Redis 之父 Antirez 进行过关于 RedLock(红锁,后续有讲到)是否安全的激烈讨论。Martin 认为一般我们使用分布式锁有两个场景:

分布式锁的特点

在了解分布式锁之前,我们首先要了解操作系统级别的锁(特指 Linux 系统)和 Java 编发编程时遇到的锁。对 Linux 锁和 Java锁有大概的了解后,我们深入分析分布式锁的实现机制。如果还想深入了解 Linux 的锁相关的信息,可查阅参考文章。

linux 锁的特点

在现代操作系统里,同一时间可能有多个内核执行命令在执行,因此内核其实像多进程多线程编程一样也需要一些同步机制来同步各执行单元对共享数据的访问。尤其是在多核CPU 系统上,更需要一些同步机制来同步不同处理器上的执行单元对共享的数据的访问。在主流的 Linux 内核中包含了几乎所有现代的操作系统具有的同步机制,这些同步机制包括:

Java锁的特点

在很多书写Java并发的文章中,我们经常看到有这些锁的概念。这些概念中,并不全指锁的状态,有的是指所得特性,有的是指所得设计。本文仅仅简要叙述锁的概念,不过多涉及Java锁的实现,这部分内容放在《Javaer不得不说的 Java “锁”事》一文中。

分布式锁的特点

对系统内核锁和Java锁有初步的了解之后,我们总结发现,所必需的要有以下特点:

常见分布式锁

一般实现分布式锁有以下几个方式:

下面就 MySQL 和 zk curator 客户端加锁的实现方式逐一列举,关于 Redis、Zk 原生客户端、etcd 等其他方式的分布式锁的实现原理,放在后面的章节。

MySQL分布式锁

MySQL实现分布式锁相对简单,创建一张锁资源表。

CREATE TABLE resource_lock (
    `id` BIGINT(20) UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
    `resource_name` VARCHAR(32) NOT NULL DEFAULT '' COMMENT '资源名称',
    `node_info` VARCHAR(128) NULL DEFAULT NULL COMMENT '',
    `count` INT(10) NOT NULL DEFAULT '0' COMMENT '',
    `description` VARCHAR(128) NULL DEFAULT NULL COMMENT '',
    `gmt_create` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '',
    `gmt_modify` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '',
    UNIQUE KEY `uk_resource` (`resource_name`)
) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4 COMMENT '资源加锁表';

前面分布式锁所说的 lock(),trylock(long timeout),trylock() 这几个方法可以用下面的伪代码实现。

lock()

lock一般是阻塞式的获取锁,阻塞知道获取到锁或者异常结束,那么我们可以写一个死循环来执行其操作:

public void lock() {
    while(true) {
        if (mysqlLock.lock(resoureName)) {
            return;
        }
        // 休眠3ms后重试
        LockSuprot.parkNanos(1000*1000*3);
    }
}

mysqlLock.lcok() 内部是一个SQL,为了达到可重入锁的效果那么我们应该先进行查询,如果有值,那么需要比较 node_info 是否一致,这里的 node_info 可以用机器 IP 和线程名字来表示,如果一致那么就加可重入锁 count 的值,如果不一致那么就返回false。如果没有值那么直接插入一条数据。伪代码如下:

public class MysqlLock {
    @Resource
    private MysqlLockMapper mysqlLockMapper;
    private NodeInfo nodeInfo;

    public MysqlLock(NodeInfo nodeInfo) {
        this.nodeInfo = nodeInfo;
    }

    @Transcation
    public boolean lock(String resourceName) {
        MyResource result = mysqlLockMapper.existsResource(resourceName);
        if (result != null) {
            if (Objects.equeals(nodeInfo, result.getNodeInfo())) {
                mysqlLockMapper.updateResourceCount(resourceName);
                return true;
            } else {
                return false;
            }
        } else {
            mysqlLockMapper.insertResource(resourceName, nodeInfo);
        }
    }
}

需要注意的是这一段代码需要加事务,必须要保证这一系列操作的原子性。

上面代码对应的三条 Mybatis 版的SQL语句如下:

-- mysqlLockMapper.existsResource

select * from resource_lock where resource_name = ${resourceName} for update

-- mysqlLockMapper.updateResourceCount

update resource_lock set count = count + 1 where resource_name = ${resourceName}

-- mysqlLockMapper.insertResource

insert into resource_lock(`resource_name`,`node_info`,`count`,`description`)
values(#{resourceName}, ${nodeInfo}, 1, '')

trylock()

tryLock() 是非阻塞获取锁,如果获取不到那么就会马上返回,代码可以如下:

public boolean tryLock() {
    return mysqlLock.lock(resourceName);
}

trylock(long millsecs)

public boolean trylock(long millsecs) {
    // 记录超时时间
    long deadline = System.currentTimeMillis() + millsecs;
    while(true) {
        if (mysqlLock.tryLock()) {
            return true;
        }
        deadline = deadline - millsecs;

        // 避免网络延迟引起加锁失败,增加自旋超时阈值,可设置为300ms
        if (deadline <= spinTimeoutThreshold) {
            return false;
        }
        if (millsecs <= 0) {
            return false;
        }
    }
}

mysqlLock.lock 和上面一样,但是要注意的是 select … for update 这个是阻塞的获取行锁,如果同一个资源并发量较大还是有可能会退化成阻塞的获取锁。

unlock()

public boolean unlock() {
    MyResource result = mysqlLockMapper.existsResource(resourceName);
    if (result != null) {
        if (Objects.equeals(nodeInfo, result.getNodeInfo())) {
            if (result.getCount() > 1) {
                // count - 1 
                mysqlLockMapper.decrementResource(resourceName);
            } else {
                mysqlLockMapper.deleteResource(resourceName);
            }
        } else {
            return false;
        }
    } else {
        return false;
    }
}

上面新增两条 Mybatis 版本的SQL语句:

-- mysqlLockMapper.decrementResource(resourceName)

update resource_lock set count = count - 1 where resource_name = ${resourceName}

-- mysqlLockMapper.deleteResource(resourceName)

delete from resource_lock where resource_name = ${resourceName}

锁超时

我们注意到,锁的释放是通过 delete 语句删除资源锁的,如果加锁的客户端由于某些原因挂掉了,锁就一直存在。这时,我们可以通过定时任务,在加锁的时候添加任务到任务系统,也可以通过定时任务检查释放锁。

ZK客户端Curator分布式锁

ZooKeeper也是我们常见的实现分布式锁方法,ZooKeeper 是以 Paxos 算法为基础分布式应用程序协调服务。Zk 的数据节点和文件目录类似,所以我们可以用此特性实现分布式锁。我们以某个资源为目录,然后这个目录下面的节点就是我们需要获取锁的客户端,未获取到锁的客户端注册需要注册 Watcher 到上一个客户端,可以用下图表示。

/lock是我们用于加锁的目录,/resource_name是我们锁定的资源,其下面的节点按照我们加锁的顺序排列。

Curator 封装了 Zookeeper 底层的 API,使我们更加容易方便的对 Zookeeper 进行操作,并且它封装了分布式锁的功能,这样我们就不需要再自己实现了。

Curator 实现了可重入锁(InterProcessMutex),也实现了不可重入锁(InterProcessSemaphoreMutex)。在可重入锁中还实现了读写锁。

Curator-Recipes实现了五种分布式锁:

下面就分布式可重入锁举例。

可重入锁InterProcessMutex

InterProcessMutex 是 Curator 实现的可重入锁,创建 InterProcessMutex 实例
InterProcessMutex 提供了两个构造方法,传入一个 CuratorFramework 实例和一个要使用的节点路径,InterProcessMutex 还允许传入一个自定义的驱动类,默认是使用 StandardLockInternalsDriver。

public InterProcessMutex(CuratorFramework client, String path);
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver);

获取锁

使用 acquire 方法获取锁, acquire 方法有两种:

public void acquire() throws Exception;

获取锁,一直阻塞到获取到锁为止。获取锁的线程在获取锁后仍然可以调用 acquire() 获取锁(可重入)。 锁获取使用完后,调用了几次 acquire(),就得调用几次 release() 释放。

public boolean acquire(long time, TimeUnit unit) throws Exception;

与 acquire()类似,等待 time * unit 时间获取锁,如果仍然没有获取锁,则直接返回 false。

public class FakeLimitedResource {

    //总共250张火车票
    private Integer ticket = 250;

    public void use() throws InterruptedException {
        try {
            System.out.println("火车票还剩"+(--ticket)+"张!");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
public class ExampleClientThatLocks {

    /** 锁 */
    private final InterProcessMutex lock;
    /** 共享资源 */
    private final FakeLimitedResource resource;
    /** 客户端名称 */
    private final String clientName;

    public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if ( !lock.acquire(time, unit) ) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " has the lock");
            //操作资源
            resource.use();
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); //总是在Final块中释放锁。
        }
    }
}

public class LockingExample {
    private static final int QTY = 5; // 并发操作线程数
    private static final int REPETITIONS = QTY * 10; // 资源总量
    private static final String CONNECTION_STRING = "127.0.0.1:2181";
    private static final String PATH = "/locks";

    public static void main(String[] args) throws Exception {

        //FakeLimitedResource模拟某些外部资源,这些外部资源一次只能由一个进程访问
        final FakeLimitedResource resource = new FakeLimitedResource();

        ExecutorService service = Executors.newFixedThreadPool(QTY);
        try {
            for ( int i = 0; i < QTY; ++i ){
                final int index = i;
                Callable<Void>  task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));
                        try {
                            client.start();
                            ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index);
                            for ( int j = 0; j < REPETITIONS; ++j ) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        }catch ( InterruptedException e ){
                            Thread.currentThread().interrupt();
                        }catch ( Exception e ){
                            e.printStackTrace();
                        }finally{
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

起五个线程,即五个窗口卖票,五个客户端分别有50张票可以卖,先是尝试获取锁,操作资源后,释放锁。

加锁的流程具体如下:

  1. 首先进行可重入的判定:这里的可重入锁记录在 ConcurrentMap,threadData 这个 Map 里面,如果threadData.get(currentThread)是有值的那么就证明是可重入锁,然后记录就会加1。我们之前的 Mysql 其实也可以通过这种方法去优化,可以不需要 count 字段的值,将这个维护在本地可以提高性能。
  2. 然后在我们的资源目录下创建一个节点:比如这里创建一个 /0000000002 这个节点,这个节点需要设置为 EPHEMERAL_SEQUENTIAL 也就是临时节点并且有序。
  3. 获取当前目录下所有子节点,判断自己的节点是否位于子节点第一个。
  4. 如果是第一个,则获取到锁,那么可以返回。
  5. 如果不是第一个,则证明前面已经有人获取到锁了,那么需要获取自己节点的前一个节点。/0000000002 的前一个节点是 /0000000001,我们获取到这个节点之后,再上面注册Watcher(这里的 watcher 其实调用的是 object.notifyAll(),用来解除阻塞)。
  6. object.wait(timeout) 或 object.wait() :进行阻塞等待这里和我们第5步的watcher相对应。

释放锁

线程通过 acquire() 获取锁时,可通过 release()进行释放,如果该线程多次调用了 acquire() 获取锁,则如果只调用一次 release() 该锁仍然会被该线程持有。

note:同一个线程中InterProcessMutex实例是可重用的,也就是不需要在每次获取锁的时候都new一个InterProcessMutex实例,用同一个实例就好。

解锁的具体流程:

  1. 首先进行可重入锁的判定: 如果有可重入锁只需要次数减 1 即可,减1之后加锁次数为 0 的话继续下面步骤,不为 0 直接返回。
  2. 删除当前节点。
  3. 删除 threadDataMap 里面的可重入锁的数据。

读写锁

Curator提供了读写锁,其实现类是 InterProcessReadWriteLock,这里的每个节点都会加上前缀:

private static final String READ_LOCK_NAME  = "__READ__";
private static final String WRITE_LOCK_NAME = "__WRIT__";

根据不同的前缀区分是读锁还是写锁,对于读锁,如果发现前面有写锁,那么需要将 watcher 注册到和自己最近的写锁。写锁的逻辑和我们之前分析的依然保持不变。

锁超时

Zookeeper不需要配置锁超时,由于我们设置节点是临时节点,我们的每个机器维护着一个ZK的session,通过这个session,ZK可以判断机器是否宕机。如果我们的机器挂掉的话,那么这个临时节点对应的就会被删除,所以我们不需要关心锁超时。

分布式锁的安全问题

GC的STW

对于这个问题可以看见基本所有的都会出现问题,Martin 给出了一个解法,对于 ZK 这种他会生成一个自增的序列,那么我们真正进行对资源操作的时候,需要判断当前序列是否是最新,有点类似于我们乐观锁。当然这个解法Redis作者进行了反驳,你既然都能生成一个自增的序列了那么你完全不需要加锁了,也就是可以按照类似于Mysql乐观锁的解法去做。

时钟发生跳跃

Martin 觉得 RedLock 不安全很大的原因也是因为时钟的跳跃,因为锁过期强依赖于时间,但是 ZK 不需要依赖时间,依赖每个节点的 Session。Redis作者也给出了解答:对于时间跳跃分为人为调整和 NTP 自动调整。

长时间的网络I/O

这一块不是他们讨论的重点,我自己觉得,对于这个问题的优化可以控制网络调用的超时时间,把所有网络调用的超时时间相加,那么我们锁过期时间其实应该大于这个时间,当然也可以通过优化网络调用比如串行改成并行,异步化等。可以参考下面两篇文章:
并行化-你的高并发大杀器异步化-你的高并发大杀器

参考文章

  1. Linux内核中的各种锁
  2. linux几种锁的分析与比较
  3. 锁的种类与特点
  4. 聊聊分布式锁
  5. zookeeper开源客户端Curator典型应用场景之-分布式锁

该文首发《虚怀若谷》个人博客,转载前请务必署名,转载请标明出处。

古之善为道者,微妙玄通,深不可识。夫唯不可识,故强为之容:

豫兮若冬涉川,犹兮若畏四邻,俨兮其若客,涣兮若冰之释,敦兮其若朴,旷兮其若谷,混兮其若浊。

孰能浊以静之徐清?孰能安以动之徐生?

保此道不欲盈。夫唯不盈,故能敝而新成。

上一篇 下一篇

猜你喜欢

热点阅读