Javaspring boot分布式

009.分布式协调(分布式锁)

2019-12-16  本文已影响0人  撸帝

特别说明: 本人平时混迹于 B 站,不咋回复这里的评论,有问题可以到 B 站视频评论区留言找我
视频地址: https://space.bilibili.com/31137138/favlist?fid=326428938
课件说明: 本次提供的课件是 Spring Cloud Netflix 版微服务架构指南,如果有兴趣想要学习 Spring Cloud Alibaba 版,可以前往 http://www.qfdmy.com 查看相关课程资源
案例代码: https://github.com/topsale/hello-spring-cloud-netflix

什么是分布式协调

分布式协调技术主要用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种临界资源,防止造成"脏数据"的后果。

import7841-1.png

在这图中有三台机器,每台机器各跑一个应用程序。然后我们将这三台机器通过网络将其连接起来,构成一个系统来为用户提供服务,对用户来说这个系统的架构是透明的,他感觉不到我这个系统是一个什么样的架构。那么我们就可以把这种系统称作一个分布式系统

在这个分布式系统中如何对进程进行调度,我假设在第一台机器上挂载了一个资源,然后这三个物理分布的进程都要竞争这个资源,但我们又不希望他们同时进行访问,这时候我们就需要一个协调器,来让他们有序的来访问这个资源。这个协调器就是我们经常提到的那个,比如说"进程-1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。这个分布式锁也就是我们分布式协调技术实现的核心内容。

什么是分布式锁

为了防止分布式系统中的多个进程之间相互干扰,我们需要一种分布式协调技术来对这些进程进行调度。而这个分布式协调技术的核心就是来实现这个分布式锁

为什么需要分布式锁

Lusifer201810170001.png

如果我们业务中确实存在这个场景的话,我们就需要一种方法解决这个问题,这就是分布式锁要解决的问题

分布式锁应该具备哪些条件

分布式锁的实现有哪些

Redis 实现分布式锁

分布式锁实现的三个核心要素:

加锁

最简单的方法是使用 setnx 命令。key 是锁的唯一标识,按业务来决定命名。比如想要给一种商品的秒杀活动加锁,可以给 key 命名为 “lock_sale_商品ID” 。而 value 设置成什么呢?我们可以姑且设置成 1。加锁的伪代码如下:

setnx(lock_sale_商品ID,1)

当一个线程执行 setnx 返回 1,说明 key 原本不存在,该线程成功得到了锁;当一个线程执行 setnx 返回 0,说明 key 已经存在,该线程抢锁失败。

解锁

有加锁就得有解锁。当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。释放锁的最简单方式是执行 del 指令,伪代码如下:

del(lock_sale_商品ID)

释放锁之后,其他线程就可以继续执行 setnx 命令来获得锁。

锁超时

锁超时是什么意思呢?如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住(死锁),别的线程再也别想进来。所以,setnxkey 必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。setnx 不支持超时参数,所以需要额外的指令,伪代码如下:

expire(lock_sale_商品ID, 30)

综合伪代码如下:

if(setnx(lock_sale_商品ID,1) == 1){
    expire(lock_sale_商品ID,30)
    try {
        do something ......
    } finally {
        del(lock_sale_商品ID)
    }
}

存在什么问题

以上伪代码中存在三个致命问题

设想一个极端场景,当某线程执行 setnx,成功得到了锁:

Lusifer201810170002.png

setnx 刚执行成功,还未来得及执行 expire 指令,节点 1 挂掉了。

Lusifer201810170003.png

这样一来,这把锁就没有设置过期时间,变成死锁,别的线程再也无法获得锁了。

怎么解决呢?setnx 指令本身是不支持传入超时时间的,set 指令增加了可选参数,伪代码如下:

set(lock_sale_商品ID,1,30,NX)

这样就可以取代 setnx 指令。

又是一个极端场景,假如某线程成功得到了锁,并且设置的超时时间是 30 秒。

Lusifer201810170004.png

如果某些原因导致线程 A 执行的很慢很慢,过了 30 秒都没执行完,这时候锁过期自动释放,线程 B 得到了锁。

Lusifer201810170005.png

随后,线程 A 执行完了任务,线程 A 接着执行 del 指令来释放锁。但这时候线程 B 还没执行完,线程A实际上 删除的是线程 B 加的锁

Lusifer201810170006.png

怎么避免这种情况呢?可以在 del 释放锁之前做一个判断,验证当前的锁是不是自己加的锁。至于具体的实现,可以在加锁的时候把当前的线程 ID 当做 value,并在删除之前验证 key 对应的 value 是不是自己线程的 ID。

加锁:

String threadId = Thread.currentThread().getId()
set(key,threadId ,30,NX)

解锁:

if(threadId .equals(redisClient.get(key))){
    del(key)
}

但是,这样做又隐含了一个新的问题,判断和释放锁是两个独立操作,不是原子性。

还是刚才第二点所描述的场景,虽然我们避免了线程 A 误删掉 key 的情况,但是同一时间有 A,B 两个线程在访问代码块,仍然是不完美的。怎么办呢?我们可以让获得锁的线程开启一个守护线程,用来给快要过期的锁“续航”。

Lusifer201810170004.png

当过去了 29 秒,线程 A 还没执行完,这时候守护线程会执行 expire 指令,为这把锁“续命”20 秒。守护线程从第 29 秒开始执行,每 20 秒执行一次。

Lusifer201810170007.png

当线程 A 执行完任务,会显式关掉守护线程。

Lusifer201810170008.png

另一种情况,如果节点 1 忽然断电,由于线程 A 和守护线程在同一个进程,守护线程也会停下。这把锁到了超时的时候,没人给它续命,也就自动释放了。

Lusifer201810170009.png

什么是 Zookeeper

ZooKeeper 是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper 通过其简单的架构和 API 解决了这个问题。ZooKeeper 允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。

Zookeeper 的数据模型

Zookeeper 的数据模型是什么样子呢?它很像数据结构当中的树,也很像文件系统的目录。

Lusifer201810170010.png

树是由节点所组成,Zookeeper 的数据存储也同样是基于节点,这种节点叫做 Znode

但是,不同于树的节点,Znode 的引用方式是路径引用,类似于文件路径:

/动物/猫
/汽车/宝马

这样的层级结构,让每一个 Znode 节点拥有唯一的路径,就像命名空间一样对不同信息作出清晰的隔离。

Znode 包含哪些元素

Lusifer201810170011.png

这里需要注意一点,Zookeeper 是为读多写少的场景所设计。Znode 并不是用来存储大规模业务数据,而是用于存储少量的状态和配置信息,每个节点的数据最大不能超过 1MB

Zookeeper 的基本操作

create
delete
exists
getData
setData
getChildren

这其中,existsgetDatagetChildren 属于读操作。Zookeeper 客户端在请求读操作的时候,可以选择是否设置 Watch

Zookeeper 的事件通知

我们可以把 Watch 理解成是注册在特定 Znode 上的触发器。当这个 Znode 发生改变,也就是调用了 createdeletesetData 方法的时候,将会触发 Znode 上注册的对应事件,请求 Watch 的客户端会接收到异步通知。

具体交互过程如下:

Lusifer201810170012.png Lusifer201810170013.png

Zookeeper 的一致性

Zookeeper 身为分布式系统协调服务,如果自身挂了如何处理呢?为了防止单机挂掉的情况,Zookeeper 维护了一个集群。如下图:

微信图片_20181017182528.jpg

Zookeeper Service 集群是一主多从结构。

在更新数据时,首先更新到主节点(这里的节点是指服务器,不是 Znode),再同步到从节点。

在读取数据时,直接读取任意从节点。

为了保证主从节点的数据一致性,Zookeeper 采用了 ZAB 协议,这种协议非常类似于一致性算法 PaxosRaft

什么是 ZAB

Zookeeper Atomic Broadcast,有效解决了 Zookeeper 集群崩溃恢复,以及主从同步数据的问题。

最大 ZXID 也就是节点本地的最新事务编号,包含 epoch 和计数两部分。epoch 是纪元的意思,相当于 Raft 算法选主时候的 term。

假如 Zookeeper 当前的主节点挂掉了,集群会进行崩溃恢复。ZAB 的崩溃恢复分成三个阶段:

Leader election

选举阶段,此时集群中的节点处于 Looking 状态。它们会各自向其他节点发起投票,投票当中包含自己的服务器 ID 和最新事务 ID(ZXID)。

Lusifer201810170014.png

接下来,节点会用自身的 ZXID 和从其他节点接收到的 ZXID 做比较,如果发现别人家的 ZXID 比自己大,也就是数据比自己新,那么就重新发起投票,投票给目前已知最大的 ZXID 所属节点。

Lusifer201810170015.png

每次投票后,服务器都会统计投票数量,判断是否有某个节点得到半数以上的投票。如果存在这样的节点,该节点将会成为准 Leader,状态变为 Leading。其他节点的状态变为 Following。

Lusifer201810170016.png

Discovery

发现阶段,用于在从节点中发现最新的 ZXID 和事务日志。或许有人会问:既然 Leader 被选为主节点,已经是集群里数据最新的了,为什么还要从节点中寻找最新事务呢?

这是为了防止某些意外情况,比如因网络原因在上一阶段产生多个 Leader 的情况。

所以这一阶段,Leader 集思广益,接收所有 Follower 发来各自的最新 epoch 值。Leader 从中选出最大的 epoch,基于此值加 1,生成新的 epoch 分发给各个 Follower。

各个 Follower 收到全新的 epoch 后,返回 ACK 给 Leader,带上各自最大的 ZXID 和历史事务日志。Leader 选出最大的 ZXID,并更新自身历史日志。

Synchronization

同步阶段,把 Leader 刚才收集得到的最新历史事务日志,同步给集群中所有的 Follower。只有当半数 Follower 同步成功,这个准 Leader 才能成为正式的 Leader。

自此,故障恢复正式完成。

Broadcast

ZAB 的数据写入涉及到 Broadcast 阶段,简单来说,就是 Zookeeper 常规情况下更新数据的时候,由 Leader 广播到所有的 Follower。其过程如下:

微信图片_20181017192657.jpg

ZAB 协议既不是强一致性,也不是弱一致性,而是处于两者之间的单调一致性(顺序一致性)。它依靠事务 ID 和版本号,保证了数据的更新和读取是有序的。

Zookeeper 的应用场景

分布式锁

这是雅虎研究员设计 Zookeeper 的初衷。利用 Zookeeper 的临时顺序节点,可以轻松实现分布式锁。

服务注册和发现

利用 Znode 和 Watcher,可以实现分布式服务的注册和发现。最著名的应用就是阿里的分布式 RPC 框架 Dubbo。

共享配置和状态信息

Redis 的分布式解决方案 Codis,就利用了 Zookeeper 来存放数据路由表和 codis-proxy 节点的元信息。同时 codis-config 发起的命令都会通过 ZooKeeper 同步到各个存活的 codis-proxy。

此外,Kafka、HBase、Hadoop,也都依靠 Zookeeper 同步节点信息,实现高可用。

Zookeeper 实现分布式锁

Znode 的四种类型

Lusifer201810170010.png

Zookeeper 的数据存储结构就像一棵树,这棵树由节点组成,这种节点叫做 Znode。

默认的节点类型。创建节点的客户端与 Zookeeper 断开连接后,该节点依旧存在。

所谓顺序节点,就是在创建节点时,Zookeeper 根据创建的时间顺序给该节点名称进行编号:

Lusifer201810190001.png

和持久节点相反,当创建节点的客户端与 Zookeeper 断开连接后,临时节点会被删除:

Lusifer201810190002.png Lusifer201810190003.png Lusifer201810190004.png

顾名思义,临时顺序节点结合和临时节点和顺序节点的特点:在创建节点时,Zookeeper 根据创建的时间顺序给该节点名称进行编号;当创建节点的客户端与 Zookeeper 断开连接后,临时节点会被删除。

Zookeeper 分布式锁的原理

Zookeeper 分布式锁恰恰应用了临时顺序节点。具体如何实现呢?让我们来看一看详细步骤:

首先,在 Zookeeper 当中创建一个持久节点 ParentLock。当第一个客户端想要获得锁时,需要在 ParentLock 这个节点下面创建一个临时顺序节点 Lock1。

Lusifer201810190005.png

之后,Client1 查找 ParentLock 下面所有的临时顺序节点并排序,判断自己所创建的节点 Lock1 是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁。

Lusifer201810190006.png

这时候,如果再有一个客户端 Client2 前来获取锁,则在 ParentLock 下载再创建一个临时顺序节点 Lock2。

Lusifer201810190007.png

Client2 查找 ParentLock 下面所有的临时顺序节点并排序,判断自己所创建的节点 Lock2 是不是顺序最靠前的一个,结果发现节点 Lock2 并不是最小的。

于是,Client2 向排序仅比它靠前的节点 Lock1 注册 Watcher,用于监听 Lock1 节点是否存在。这意味着 Client2 抢锁失败,进入了等待状态。

Lusifer201810190008.png

这时候,如果又有一个客户端 Client3 前来获取锁,则在 ParentLock 下载再创建一个临时顺序节点 Lock3。

Lusifer201810190009.png

Client3 查找 ParentLock 下面所有的临时顺序节点并排序,判断自己所创建的节点 Lock3 是不是顺序最靠前的一个,结果同样发现节点 Lock3 并不是最小的。

于是,Client3 向排序仅比它靠前的节点 Lock2 注册 Watcher,用于监听 Lock2 节点是否存在。这意味着 Client3 同样抢锁失败,进入了等待状态。

Lusifer201810190010.png

这样一来,Client1 得到了锁,Client2 监听了 Lock1,Client3 监听了 Lock2。这恰恰形成了一个等待队列,

释放锁分为两种情况:

  1. 任务完成,客户端显示释放

当任务完成时,Client1 会显示调用删除节点 Lock1 的指令。

Lusifer201810190011.png
  1. 任务执行过程中,客户端崩溃

获得锁的 Client1 在任务执行过程中,如果崩溃,则会断开与 Zookeeper 服务端的链接。根据临时节点的特性,相关联的节点 Lock1 会随之自动删除。

Lusifer201810190012.png

由于 Client2 一直监听着 Lock1 的存在状态,当 Lock1 节点被删除,Client2 会立刻收到通知。这时候 Client2 会再次查询 ParentLock 下面的所有节点,确认自己创建的节点 Lock2 是不是目前最小的节点。如果是最小,则 Client2 顺理成章获得了锁。

Lusifer201810190013.png

同理,如果 Client2 也因为任务完成或者节点崩溃而删除了节点 Lock2,那么 Client3 就会接到通知。

Lusifer201810190014.png

最终,Client3 成功得到了锁。

Lusifer201810190015.png

实战 Redisson 实现分布式锁

Redisson 目前是官方唯一推荐的 Java 版的分布式锁并支持 Redlock

什么是 Redisson

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。其中包括 (BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson 提供了使用 Redis 的最简单和最便捷的方法。Redisson 的宗旨是促进使用者对 Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上

Redisson 应用场景

Redisson 结构

Redisson 作为独立节点可以用于独立执行其他节点发布到分布式执行服务和分布式调度任务服务里的远程任务。

68747470733a2f2f7265646973736f6e2e6f72672f6172636869746563747572652e706e67.png

Redisson 操作对象

package com.funtl.hello.spring.cloud.commons.redisson.operation;

import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.redisson.spring.starter.RedissonProperties;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

public class RedissonObject {

    /**
     * 数据缓存时间,默认 30 分钟
     */
    private static final Long DATA_VALID_TIME = 1000 * 60 * 30L;

    @Resource
    private RedissonClient redissonClient;

    @Resource
    private RedissonProperties redissonProperties;

    /**
     * 获取对象值
     *
     * @param name
     * @param <T>
     * @return
     */
    public <T> T getValue(String name) {
        RBucket<T> bucket = redissonClient.getBucket(name);
        return bucket.get();
    }

    /**
     * 获取对象空间
     *
     * @param name
     * @param <T>
     * @return
     */
    public <T> RBucket<T> getBucket(String name) {
        return redissonClient.getBucket(name);
    }

    /**
     * 设置对象的值
     *
     * @param name  键
     * @param value 值
     * @return
     */
    public <T> void setValue(String name, T value) {
        setValue(name, value, DATA_VALID_TIME);
    }

    /**
     * 设置对象的值
     *
     * @param name  键
     * @param value 值
     * @param time  缓存时间 单位毫秒 -1 永久缓存
     * @return
     */
    public <T> void setValue(String name, T value, Long time) {
        RBucket<Object> bucket = redissonClient.getBucket(name);
        if (time == -1) {
            bucket.set(value);
        } else {
            bucket.set(value, time, TimeUnit.MILLISECONDS);
        }
    }

    /**
     * 如果值已经存在则则不设置
     *
     * @param name  键
     * @param value 值
     * @param time  缓存时间 单位毫秒
     * @return true 设置成功,false 值存在,不设置
     */
    public <T> Boolean trySetValue(String name, T value, Long time) {
        RBucket<Object> bucket = redissonClient.getBucket(name);
        boolean b;
        if (time == -1) {
            b = bucket.trySet(value);
        } else {
            b = bucket.trySet(value, time, TimeUnit.MILLISECONDS);
        }
        return b;
    }

    /**
     * 如果值已经存在则则不设置
     *
     * @param name  键
     * @param value 值
     * @return true 设置成功,false 值存在,不设置
     */
    public <T> Boolean trySetValue(String name, T value) {
        return trySetValue(name, value, DATA_VALID_TIME);
    }

    /**
     * 删除对象
     *
     * @param name 键
     * @return true 删除成功,false 不成功
     */
    public Boolean delete(String name) {
        return redissonClient.getBucket(name).delete();
    }

}
package com.funtl.hello.spring.cloud.commons.redisson.operation;

import org.redisson.api.RBinaryStream;
import org.redisson.api.RListMultimap;
import org.redisson.api.RedissonClient;

import javax.annotation.Resource;
import java.io.InputStream;
import java.io.OutputStream;

public class RedissonBinary {

    @Resource
    private RedissonClient redissonClient;

    /**
     * 获取输出流
     *
     * @param name
     * @return
     */
    public OutputStream getOutputStream(String name) {
        RListMultimap<Object, Object> listMultimap = redissonClient.getListMultimap("");
        RBinaryStream binaryStream = redissonClient.getBinaryStream(name);
        return binaryStream.getOutputStream();
    }

    /**
     * 获取输入流
     *
     * @param name
     * @return
     */
    public InputStream getInputStream(String name) {
        RBinaryStream binaryStream = redissonClient.getBinaryStream(name);
        return binaryStream.getInputStream();
    }

    /**
     * 获取输入流
     *
     * @param name
     * @return
     */
    public InputStream getValue(String name, OutputStream stream) {
        try {
            RBinaryStream binaryStream = redissonClient.getBinaryStream(name);
            InputStream inputStream = binaryStream.getInputStream();
            byte[] buff = new byte[1024];
            int len;
            while ((len = inputStream.read(buff)) != -1) {
                stream.write(buff, 0, len);
            }
            return binaryStream.getInputStream();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取对象空间
     *
     * @param name
     * @return
     */
    public RBinaryStream getBucket(String name) {
        return redissonClient.getBinaryStream(name);
    }

    /**
     * 设置对象的值
     *
     * @param name  键
     * @param value 值
     * @return
     */
    public void setValue(String name, InputStream value) {
        try {
            RBinaryStream binaryStream = redissonClient.getBinaryStream(name);
            binaryStream.delete();
            OutputStream outputStream = binaryStream.getOutputStream();
            byte[] buff = new byte[1024];
            int len;
            while ((len = value.read(buff)) != -1) {
                outputStream.write(buff, 0, len);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 删除对象
     *
     * @param name 键
     * @return true 删除成功,false 不成功
     */
    public Boolean delete(String name) {
        RBinaryStream binaryStream = redissonClient.getBinaryStream(name);
        return binaryStream.delete();
    }

}
package com.funtl.hello.spring.cloud.commons.redisson.operation;

import org.redisson.api.RList;
import org.redisson.api.RMap;
import org.redisson.api.RSet;
import org.redisson.api.RedissonClient;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class RedissonCollection {

    /**
     * 数据缓存时间,默认 30 分钟
     */
    private static final Long DATA_VALID_TIME = 1000 * 60 * 30L;

    @Resource
    private RedissonClient redissonClient;

    /**
     * 获取map集合
     *
     * @param name
     * @param <K>
     * @param <V>
     * @return
     */
    public <K, V> RMap<K, V> getMap(String name) {
        return redissonClient.getMap(name);
    }

    /**
     * 设置map集合
     *
     * @param name
     * @param data
     * @param time 缓存时间,单位毫秒 -1永久缓存
     * @return
     */
    public void setMapValues(String name, Map data, Long time) {
        RMap map = redissonClient.getMap(name);
        Long dataValidTime = DATA_VALID_TIME;
        if (time != -1) {
            map.expire(dataValidTime, TimeUnit.MILLISECONDS);
        }
        map.putAll(data);
    }

    /**
     * 设置map集合
     *
     * @param name
     * @param data
     * @return
     */
    public void setMapValues(String name, Map data) {
        setMapValues(name, data, DATA_VALID_TIME);
    }

    /**
     * 获取List集合
     *
     * @param name
     * @return
     */
    public <T> RList<T> getList(String name) {
        return redissonClient.getList(name);
    }

    /**
     * 设置List集合
     *
     * @param name
     * @param data
     * @param time 缓存时间,单位毫秒 -1永久缓存
     * @return
     */
    public void setListValues(String name, List data, Long time) {
        RList list = redissonClient.getList(name);
        Long dataValidTime = DATA_VALID_TIME;
        if (time != -1) {
            list.expire(dataValidTime, TimeUnit.MILLISECONDS);
        }
        list.addAll(data);
    }

    /**
     * 设置List集合
     *
     * @param name
     * @param data
     * @return
     */
    public void setListValues(String name, List data) {
        setListValues(name, data, DATA_VALID_TIME);
    }

    /**
     * 获取set集合
     *
     * @param name
     * @return
     */
    public <T> RSet<T> getSet(String name) {
        return redissonClient.getSet(name);
    }

    /**
     * 设置set集合
     *
     * @param name
     * @param data
     * @param time 缓存时间,单位毫秒 -1永久缓存
     * @return
     */
    public void setSetValues(String name, Set data, Long time) {
        RSet set = redissonClient.getSet(name);
        Long dataValidTime = DATA_VALID_TIME;
        if (time != -1) {
            set.expire(dataValidTime, TimeUnit.MILLISECONDS);
        }
        set.addAll(data);
    }

    /**
     * 设置set集合
     *
     * @param name
     * @param data
     * @return
     */
    public void setSetValues(String name, Set data) {
        setSetValues(name, data, DATA_VALID_TIME);
    }


}
package com.funtl.hello.spring.cloud.commons.configuration;

import com.funtl.hello.spring.cloud.commons.redisson.operation.RedissonBinary;
import com.funtl.hello.spring.cloud.commons.redisson.operation.RedissonCollection;
import com.funtl.hello.spring.cloud.commons.redisson.operation.RedissonObject;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfiguration {

    @Bean
    @ConditionalOnMissingBean(RedissonObject.class)
    public RedissonObject redissonObject() {
        return new RedissonObject();
    }

    @Bean
    @ConditionalOnMissingBean(RedissonBinary.class)
    public RedissonBinary redissonBinary() {
        return new RedissonBinary();
    }

    @Bean
    @ConditionalOnMissingBean(RedissonCollection.class)
    public RedissonCollection redissonCollection() {
        return new RedissonCollection();
    }

}

通过 RLock 对象操作分布式锁

注意: 此处新建一个名为 provider-item-service 的服务提供者,复制之前创建的 provider-admin-service 项目并修改相关配置即可

create table tb_item (id int not null primary key,name varchar(100),num int not null);
insert into tb_item(id, name,num) values(1000000, 'Apple', 3);
package com.funtl.hello.spring.cloud.provider.controller;

import com.funtl.hello.spring.cloud.provider.domain.TbItem;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@Slf4j
@RestController
public class ProviderItemController {

    @Resource
    private RedissonClient redissonClient;

    @PostMapping(value = "num/local")
    public String testNumLock(TbItem tbItem) {
        // 加锁,此处根据商品名称加锁
        RLock lock = redissonClient.getLock(tbItem.getName());
        lock.lock();
        log.info("Thread {} 拿到了 {} 的锁", Thread.currentThread().getId(), tbItem.getName());

        try {
            // 阻塞模拟业务操作时间
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 解锁
        lock.unlock();
        log.info("Thread {} 释放了 {} 的锁", Thread.currentThread().getId(), tbItem.getName());

        return "ok";
    }
}
Thread 104 拿到了 Apple 的锁
Thread 104 释放了 Apple 的锁
Thread 106 拿到了 Apple 的锁
Thread 106 释放了 Apple 的锁

通过自定义注解操作分布式锁

从上面的代码可以看出这是一个典型的环绕切面,我们可以使用 AOP 思想将交叉业务剥离出来,采用注解的方式切面操作分布式锁

<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjrt</artifactId>
</dependency>
package com.funtl.hello.spring.cloud.commons.redisson.enums;

public enum RedissonLockModel {

    /**
     * 可重入锁:某个线程已经获得某个锁,可以再次获取锁而不会出现死锁
     */
    REENTRANT,

    /**
     * 公平锁:加锁前先查看是否有排队等待的线程,有的话优先处理排在前面的线程
     */
    FAIR,

    /**
     * 联锁:可以把一组锁当作一个锁来加锁和释放
     * 基于 Redis 的分布式 RedissonMultiLock 对象将多个 RLock 对象分组,并将它们作为一个锁处理。
     * 每个 RLock 对象可能属于不同的 Redisson 实例
     */
    MULTIPLE,

    /**
     * 红锁:用于解决异步数据丢失和脑裂问题
     * 假设有多个 Redis 节点,这些节点之间既没有主从,也没有集群关系。
     * 客户端用相同的 key 和随机值在多个节点上请求锁,请求锁的超时时间应小于锁自动释放时间。
     * 当超过半数 Redis 上请求到锁的时候,才算是真正获取到了锁。
     * 如果没有获取到锁,则把部分已锁的 Redis 释放掉
     */
    REDLOCK,

    /**
     * 读锁(共享锁):共享用于不更改或不更新数据的操作(只读操作),如 SELECT 语句
     * 如果事务 T 对数据 A 加上共享锁后,则其他事务只能对 A 再加共享锁,不能加排他锁。
     * 获准共享锁的事务只能读数据,不能修改数据
     */
    READ,

    /**
     * 写锁(排他锁):用于数据修改操作,例如 INSERT、UPDATE 或 DELETE。确保不会同时同一资源进行多重更新
     * 如果事务 T 对数据 A 加上排他锁后,则其他事务不能再对 A 加任何类型的锁。
     * 获准排他锁的事务既能读数据,又能修改数据。
     * 我们在操作数据库的时候,可能会由于并发问题而引起的数据的不一致性(数据冲突)
     */
    WRITE,

    /**
     * 自动模式,当参数只有一个使用 REENTRANT 参数多个 REDLOCK
     */
    AUTO

}
package com.funtl.hello.spring.cloud.commons.redisson.excepiton;

public class RedissonLockException extends RuntimeException {

    public RedissonLockException() {
    }

    public RedissonLockException(String message) {
        super(message);
    }

    public RedissonLockException(String message, Throwable cause) {
        super(message, cause);
    }

    public RedissonLockException(Throwable cause) {
        super(cause);
    }

    public RedissonLockException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
        super(message, cause, enableSuppression, writableStackTrace);
    }

}
package com.funtl.hello.spring.cloud.commons.redisson.annotation;

import com.funtl.hello.spring.cloud.commons.redisson.enums.RedissonLockModel;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedissonLock {

    /**
     * 锁的模式:如果不设置,自动模式,当参数只有一个使用 REENTRANT 参数多个 MULTIPLE
     *
     * @return
     */
    RedissonLockModel lockModel() default RedissonLockModel.AUTO;

    /**
     * 如果 keys 有多个,如果不设置则使用联锁
     *
     * @return
     */
    String[] keys() default {};

    /**
     * key 的静态常量:当 key 的 spel 的值是 List,数组时使用 + 号连接将会被 spel 认为这个变量是个字符串,只能产生一把锁,达不到我们的目的
     * 而我们如果又需要一个常量的话这个参数将会在拼接在每个元素的后面
     *
     * @return
     */
    String keyConstant() default "";

    /**
     * 锁超时时间,默认 30000 毫秒
     *
     * @return
     */
    long lockWatchdogTimeout() default 0;

    /**
     * 等待加锁超时时间,默认 10000 毫秒 -1 则表示一直等待
     *
     * @return
     */
    long attemptTimeout() default 0;


}
package com.funtl.hello.spring.cloud.commons.redisson.aop;

import com.funtl.hello.spring.cloud.commons.redisson.annotation.RedissonLock;
import com.funtl.hello.spring.cloud.commons.redisson.enums.RedissonLockModel;
import com.funtl.hello.spring.cloud.commons.redisson.excepiton.RedissonLockException;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.RedissonMultiLock;
import org.redisson.RedissonRedLock;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.core.annotation.Order;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Slf4j
@Aspect
@Order(-10)
public class RedissonLockAop {

    /**
     * 等待加锁超时时间 -1 一直等待
     */
    private static final Long ATTEMPT_TIMEOUT = 10000L;

    /**
     * 看门狗
     * 在一个分布式环境下,多个服务实例请求获取锁,其中服务实例 A 成功获取到了锁,在执行业务逻辑的过程中,服务实例突然挂掉了可以采用锁超时机制解决
     * 如果服务实例 A 没有宕机但是业务执行还没有结束,锁释放掉了就会导致线程问题(误删锁)。此时就一定要实现自动延长锁有效期的机制
     * 看门狗的主要作用:只要这台服务实例没有挂掉,并且没有主动释放锁,看门狗都会每隔十秒给你续约一下,保证锁一直在你手中
     */
    private static final Long LOCK_WATCH_DOG_TIMEOUT = 30000L;

    private RedissonLockModel lockModel;

    @Autowired
    private RedissonClient redissonClient;

    @Pointcut("@annotation(redissonLock)")
    public void controllerAspect(RedissonLock redissonLock) {
    }

    /**
     * 通过 Spring SpEL 获取参数
     *
     * @param key            定义的 key值以 # 开头 例如:#user
     * @param parameterNames 形参
     * @param values         形参值
     * @param keyConstant    key的常亮
     * @return
     */
    public List<String> getVauleBySpel(String key, String[] parameterNames, Object[] values, String keyConstant) {
        List<String> keys = new ArrayList<>();
        if (!key.contains("#")) {
            String s = "redisson:lock:" + key + keyConstant;
            log.info("没有使用 SpEL 表达式 value -> {}", s);
            keys.add(s);
            return keys;
        }
        // SpEL 解析器
        ExpressionParser parser = new SpelExpressionParser();
        // SpEL 上下文
        EvaluationContext context = new StandardEvaluationContext();
        for (int i = 0; i < parameterNames.length; i++) {
            context.setVariable(parameterNames[i], values[i]);
        }
        Expression expression = parser.parseExpression(key);
        Object value = expression.getValue(context);
        if (value != null) {
            if (value instanceof List) {
                List value1 = (List) value;
                for (Object o : value1) {
                    keys.add("redisson:lock:" + o.toString() + keyConstant);
                }
            } else if (value.getClass().isArray()) {
                Object[] obj = (Object[]) value;
                for (Object o : obj) {
                    keys.add("redisson:lock:" + o.toString() + keyConstant);
                }
            } else {
                keys.add("redisson:lock:" + value.toString() + keyConstant);
            }
        }
        log.info("SpEL 表达式 key={}, value={}", key, keys);
        return keys;
    }

    @Around("controllerAspect(redissonLock)")
    public Object aroundAdvice(ProceedingJoinPoint proceedingJoinPoint, RedissonLock redissonLock) throws Throwable {
        String[] keys = redissonLock.keys();
        if (keys.length == 0) {
            throw new RuntimeException("keys 不能为空");
        }
        String[] parameterNames = new LocalVariableTableParameterNameDiscoverer().getParameterNames(((MethodSignature) proceedingJoinPoint.getSignature()).getMethod());
        Object[] args = proceedingJoinPoint.getArgs();

        long attemptTimeout = redissonLock.attemptTimeout();
        if (attemptTimeout == 0) {
            attemptTimeout = ATTEMPT_TIMEOUT;
        }
        long lockWatchdogTimeout = redissonLock.lockWatchdogTimeout();
        if (lockWatchdogTimeout == 0) {
            lockWatchdogTimeout = LOCK_WATCH_DOG_TIMEOUT;
        }
        RedissonLockModel lockModel = redissonLock.lockModel();
        if (lockModel.equals(RedissonLockModel.AUTO)) {
            RedissonLockModel lockModel1 = lockModel;
            if (lockModel1 != null && !lockModel1.equals(RedissonLockModel.AUTO)) {
                lockModel = lockModel1;
            } else if (keys.length > 1) {
                lockModel = RedissonLockModel.REDLOCK;
            } else {
                lockModel = RedissonLockModel.REENTRANT;
            }
        }
        if (!lockModel.equals(RedissonLockModel.MULTIPLE) && !lockModel.equals(RedissonLockModel.REDLOCK) && keys.length > 1) {
            throw new RuntimeException("参数有多个, 锁模式为 -> " + lockModel.name() + ".无法锁定");
        }
        log.info("锁模式 -> {}, 等待锁定时间 -> {} 秒.锁定最长时间 -> {} 秒", lockModel.name(), attemptTimeout / 1000, lockWatchdogTimeout / 1000);
        boolean res = false;
        RLock rLock = null;
        // 一直等待加锁
        switch (lockModel) {
            case FAIR:
                rLock = redissonClient.getFairLock(getVauleBySpel(keys[0], parameterNames, args, redissonLock.keyConstant()).get(0));
                break;
            case REDLOCK:
                List<RLock> rLocks = new ArrayList<>();
                for (String key : keys) {
                    List<String> vauleBySpel = getVauleBySpel(key, parameterNames, args, redissonLock.keyConstant());
                    for (String s : vauleBySpel) {
                        rLocks.add(redissonClient.getLock(s));
                    }
                }
                RLock[] locks = new RLock[rLocks.size()];
                int index = 0;
                for (RLock r : rLocks) {
                    locks[index++] = r;
                }
                rLock = new RedissonRedLock(locks);
                break;
            case MULTIPLE:
                rLocks = new ArrayList<>();

                for (String key : keys) {
                    List<String> vauleBySpel = getVauleBySpel(key, parameterNames, args, redissonLock.keyConstant());
                    for (String s : vauleBySpel) {
                        rLocks.add(redissonClient.getLock(s));
                    }
                }
                locks = new RLock[rLocks.size()];
                index = 0;
                for (RLock r : rLocks) {
                    locks[index++] = r;
                }
                rLock = new RedissonMultiLock(locks);
                break;
            case REENTRANT:
                List<String> vauleBySpel = getVauleBySpel(keys[0], parameterNames, args, redissonLock.keyConstant());
                //如果spel表达式是数组或者LIST 则使用红锁
                if (vauleBySpel.size() == 1) {
                    rLock = redissonClient.getLock(vauleBySpel.get(0));
                } else {
                    locks = new RLock[vauleBySpel.size()];
                    index = 0;
                    for (String s : vauleBySpel) {
                        locks[index++] = redissonClient.getLock(s);
                    }
                    rLock = new RedissonRedLock(locks);
                }
                break;
            case READ:
                RReadWriteLock rwlock = redissonClient.getReadWriteLock(getVauleBySpel(keys[0], parameterNames, args, redissonLock.keyConstant()).get(0));
                rLock = rwlock.readLock();
                break;
            case WRITE:
                RReadWriteLock rwlock1 = redissonClient.getReadWriteLock(getVauleBySpel(keys[0], parameterNames, args, redissonLock.keyConstant()).get(0));
                rLock = rwlock1.writeLock();
                break;
        }

        // 执行 AOP
        if (rLock != null) {
            try {
                if (attemptTimeout == -1) {
                    res = true;
                    // 一直等待加锁
                    rLock.lock(lockWatchdogTimeout, TimeUnit.MILLISECONDS);
                } else {
                    res = rLock.tryLock(attemptTimeout, lockWatchdogTimeout, TimeUnit.MILLISECONDS);
                }
                if (res) {
                    Object obj = proceedingJoinPoint.proceed();
                    return obj;
                } else {
                    throw new RedissonLockException("获取锁失败");
                }
            } finally {
                if (res) {
                    rLock.unlock();
                }
            }
        }
        throw new RedissonLockException("获取锁失败");
    }

}
@Bean
@ConditionalOnMissingBean(RedissonLockAop.class)
public RedissonLockAop redissonLockAop() {
    return new RedissonLockAop();
}
@RedissonLock(keys = "#tbItem.name", lockModel = RedissonLockModel.AUTO)
@PostMapping(value = "num/annotation")
public String testNumAnnotation(TbItem tbItem) {
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "ok";
}
锁模式 -> REENTRANT, 等待锁定时间 -> 10 秒.锁定最长时间 -> 30 秒
SpEL 表达式 key=#tbItem.name, value=[redisson:lock:Apple]

注意: 记得同时通过 Redis 客户端工具观察数据变化

特别说明: 本人平时混迹于 B 站,不咋回复这里的评论,有问题可以到 B 站视频评论区留言找我
视频地址: https://space.bilibili.com/31137138/favlist?fid=326428938
课件说明: 本次提供的课件是 Spring Cloud Netflix 版微服务架构指南,如果有兴趣想要学习 Spring Cloud Alibaba 版,可以前往 http://www.qfdmy.com 查看相关课程资源
案例代码: https://github.com/topsale/hello-spring-cloud-netflix

上一篇下一篇

猜你喜欢

热点阅读