分布式锁系列(1) 基于 DB & ZK
2020-02-17 本文已影响0人
suxin1932
1.概述
1.1 什么是分布式
#保证最终一致性与高可用性
分布式的 CAP 理论告诉我们:
任何一个分布式系统都无法同时满足一致性(Consistency), 可用性(Availability)和分区容错性(Partition tolerance), 最多只能同时满足两项。
目前很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。
基于 CAP理论,很多系统在设计之初就要对这三者做出取舍。
在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证最终一致性。
有些场景例外(如金融场景)。
#分布式场景
此处主要指集群模式下,多个相同服务同时开启。
在许多的场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,
比如分布式事务、分布式锁等。
很多时候我们需要保证一个方法在同一时间内只能被同一个线程执行。
1) 在单机环境中,通过 Java 提供的并发 API (synchronized, ReentrantLock) 我们可以解决,但是在分布式环境下,复杂的多。
2) 分布式与单机情况下最大的不同在于其不是多线程而是"多进程"。
多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置。
而进程之间甚至可能都不在同一台物理机上,因此需要将标记存储在一个所有进程都能看到的地方。
1.2 什么是锁
1.在单进程的系统中,当存在多个线程可以同时改变某个变量(可变共享变量)时,
就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量。
2.而同步的本质是通过锁来实现的。
为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,
那么需要在某个地方做个标记,这个标记必须每个线程都能看到,当标记不存在时可以设置该标记,
其余后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记。
这个标记可以理解为锁。
3.不同地方实现锁的方式也不一样,只要能满足所有线程都能看得到标记即可。
如 Java 中 synchronized 是在对象头设置标记,
Lock 接口的实现类基本上都只是某一个 volatile 修饰的 int 型变量其保证每个线程都能拥有对该 int 的可见性和原子修改,
linux 内核中也是利用互斥量或信号量等内存数据做标记。
4.除了利用内存数据做锁其实任何互斥的都能做锁(只考虑互斥情况),
如流水表中流水号与时间结合做幂等校验可以看作是一个不会释放的锁,
或者使用某个文件是否存在作为锁等。
只需要满足在对标记进行修改能保证原子性和内存可见性即可。
1.3 什么是分布式锁
1.当在分布式模型下,数据只有一份(或有限制), 此时需要利用锁的技术控制某一时刻修改数据的进程数。
2.与单机模式下的锁不同, 分布式锁不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。
分布式情况下之所以问题变得复杂,主要就是需要考虑到网络的延时和不可靠。
3.分布式锁还是可以将标记存在内存,只是该内存不是某个进程分配的内存而是公共内存如 Redis, Memcache。
至于利用数据库、文件等做锁与单机的实现是一样的,只要保证标记能互斥就行。
2.几种实现方式
2.1 基于数据库
https://www.jianshu.com/p/f65efc4cd860 (关于数据库锁可参考该文)
2.1.1 基于数据库的乐观锁
#基于表字段版本号做分布式锁
---- 为每个表设计一个版本号字段 ----
这个策略源于 mysql 的 mvcc 机制,使用这个策略其实本身没有什么问题,
唯一的问题就是对数据表侵入较大,在高并发的要求下,对数据库连接的开销也是无法忍受的。
#基本流程
当我们要从数据库中读取数据的时候,同时把这个version字段也读出来,
如果要对读出来的数据进行更新后写回数据库,则需要将version加1,
同时将新的数据与新的version更新到数据表中,
且必须在更新的时候同时检查目前数据库里version值是不是之前的那个version,
如果是,则正常更新。
如果不是,则更新失败,说明在这个过程中有其它的进程去更新过数据了。
#具体demo
1.设计tb_test表有id, money, version三个字段
2.更新时:
1) 先读tb_test表的数据, 得到 id=id1, version=v1
select id, name, money from tb_test
2) 每次更新task表中的value字段时, 为了防止发生冲突, 需要这样操作
update tb_test set money=newMoney,version=v1+1 where id=id1 and version=v1
成功, 则成功, 失败则表明失败.
#缺点
1.这种操作方式,使原本一次的update操作,必须变为2次操作:
select版本号一次;update一次。增加了数据库操作的次数。
2.如果业务场景中的一次业务流程中,多个资源都需要用保证数据一致性,
那么如果全部使用基于数据库资源表的乐观锁,就要让每个资源都有一张资源表,
这个在实际使用场景中肯定是无法满足的。
而且这些都基于数据库操作,在高并发的要求下,对数据库连接的开销一定是无法忍受的。
3.乐观锁机制往往基于系统中的数据存储逻辑,因此可能会造成脏数据被更新到数据库中。
在系统设计阶段,我们应该充分考虑到这些情况出现的可能性,并进行相应调整,
如将乐观锁策略在数据库存储过程中实现,对外只开放基于此存储过程的数据更新途径,
而不是将数据库表直接对外公开。
2.1.2 基于数据库的悲观锁
在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁
(注意:
InnoDB 引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。
这里我们希望使用行级锁,就要给要执行的方法字段名添加索引,
值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。
重载方法的话建议把参数类型也加上。)
当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。
我们可以认为获得排他锁的线程即可获得分布式锁,当获取到锁之后,
可以执行方法的业务逻辑,执行完方法之后,通过connection.commit()操作来释放锁。
#具体demo
/**
* 超时获取锁
* @param lockID
* @param timeOuts
* @return
* @throws InterruptedException
*/
public boolean acquireByUpdate(String lockID, long timeOuts) throws InterruptedException, SQLException {
String sql = "SELECT id from test_lock where id = ? for UPDATE ";
long futureTime = System.currentTimeMillis() + timeOuts;
long ranmain = timeOuts;
long timerange = 500;
connection.setAutoCommit(false);
while (true) {
CountDownLatch latch = new CountDownLatch(1);
try {
PreparedStatement statement = connection.prepareStatement(sql);
statement.setString(1, lockID);
statement.setInt(2, 1);
statement.setLong(1, System.currentTimeMillis());
boolean ifsucess = statement.execute();//如果成功,那么就是获取到了锁
if (ifsucess)
return true;
} catch (SQLException e) {
e.printStackTrace();
}
latch.await(timerange, TimeUnit.MILLISECONDS);
ranmain = futureTime - System.currentTimeMillis();
if (ranmain <= 0)
break;
if (ranmain < timerange) {
timerange = ranmain;
}
continue;
}
return false;
}
/**
* 释放锁
* @param lockID
* @return
* @throws SQLException
*/
public void unlockforUpdtate(String lockID) throws SQLException {
connection.commit();
}
2.2 基于 Redis实现分布式锁
https://www.jianshu.com/p/b588e9ca6d43
2.3 基于Zookeeper实现分布式锁
#### zookeeper 锁相关基础知识
>> zk 一般由多个节点构成(单数),采用 zab 一致性协议。
因此可以将 zk 看成一个单点结构,对其修改数据其内部自动将所有节点数据进行修改而后才提供查询服务。
>> zk 的数据以目录树的形式,每个目录称为 znode,
znode 中可存储数据(一般不超过 1M),还可以在其中增加子节点。
>> 子节点有三种类型。
序列化节点,每在该节点下增加一个节点自动给该节点的名称上自增。
临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除。
普通节点。
>> Watch 机制,client 可以监控每个节点的变化,当产生变化会给 client 产生一个事件。
#大致思想
每个客户端对某个方法加锁时,在 Zookeeper 上与该方法对应的指定节点的目录下,生成一个唯一的临时有序节点。
判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。
当释放锁的时候,只需将这个临时节点删除即可。
同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
#### zk 基本锁
>> 原理:利用临时节点与 watch 机制。
每个锁占用一个普通节点 /lock,当需要获取锁时在 /lock 目录下创建一个临时节点,
创建成功则表示获取锁成功,失败则 watch/lock 节点,有删除操作后再去争锁。
临时节点好处在于当进程挂掉后能自动上锁的节点自动删除即取消锁。
>> 缺点:所有取锁失败的进程都监听父节点,很容易发生羊群效应,
即当释放锁后所有等待进程一起来创建节点,并发量很大。
#### zk 锁优化
>> 原理:上锁改为创建临时有序节点,每个上锁的节点均能创建节点成功,只是其序号不同。
只有序号最小的可以拥有锁,如果这个节点序号不是最小的则 watch 序号比本身小的前一个节点 (公平锁)。
>> 步骤:
1.在 /lock 节点下创建一个有序临时节点 (EPHEMERAL_SEQUENTIAL)。
2.判断创建的节点序号是否最小,如果是最小则获取锁成功。
不是则取锁失败,然后 watch 序号比本身小的前一个节点。
3.当取锁失败,设置 watch 后则等待 watch 事件到来后,再次判断是否序号最小。
4.取锁成功则执行代码,最后释放锁(删除该节点)。
#优缺点
>> 优点:
有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。实现起来较为简单。
>> 缺点:
性能上可能并没有缓存服务那么高,因为每次在创建锁和释放锁的过程中,
都要动态创建、销毁临时节点来实现锁功能。
ZK 中创建和删除节点只能通过 Leader 服务器来执行,然后将数据同步到所有的 Follower 机器上。
2.3.1 Zookeeper 如何实现分布式锁
排他锁
排他锁,又称写锁或独占锁。
如果事务T1对数据对象O1加上了排他锁,那么在整个加锁期间,只允许事务T1对O1进行读取或更新操作,
其他任务事务都不能对这个数据对象进行任何操作,直到T1释放了排他锁。
排他锁核心是保证当前有且仅有一个事务获得锁,并且锁释放之后,
所有正在等待获取锁的事务都能够被通知到。
Zookeeper 的强一致性特性,能够很好地保证在分布式高并发情况下节点的创建一定能够保证全局唯一性,
即Zookeeper将会保证客户端无法重复创建一个已经存在的数据节点。
可以利用Zookeeper这个特性,实现排他锁。
#基本步骤
1.定义锁:
通过Zookeeper上的数据节点来表示一个锁
2.获取锁:
客户端通过调用 create 方法创建表示锁的临时节点,可以认为创建成功的客户端获得了锁,
同时可以让没有获得锁的节点在该节点上注册Watcher监听,以便实时监听到lock节点的变更情况。
3.释放锁:以下两种情况都可以让锁释放
>> 当前获得锁的客户端发生宕机或异常,那么Zookeeper上这个临时节点就会被删除
>> 正常执行完业务逻辑,客户端主动删除自己创建的临时节点
基于Zookeeper实现排他锁流程.png
共享锁
共享锁,又称读锁。
如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,
其他事务也只能对这个数据对象加共享锁,直到该数据对象上的所有共享锁都被释放。
共享锁与排他锁的区别在于,
加了排他锁之后,数据对象只对当前事务可见,
而加了共享锁之后,数据对象对所有事务都可见。
#基本步骤
1.定义锁:
通过Zookeeper上的数据节点来表示一个锁,是一个类似于
/lockpath/[hostname]-请求类型-序号 的临时顺序节点。
2.获取锁:
客户端通过调用 create 方法创建表示锁的临时顺序节点,
如果是读请求,则创建 /lockpath/[hostname]-R-序号 节点,
如果是写请求则创建 /lockpath/[hostname]-W-序号 节点。
>> 判断读写顺序:大概分为4个步骤
1) 创建完节点后,获取 /lockpath 节点下的所有子节点,并对该节点注册子节点变更的Watcher监听
2) 确定自己的节点序号在所有子节点中的顺序
3.1) 对于读请求:
>> 如果没有比自己序号更小的子节点,或者比自己序号小的子节点都是读请求,
那么表明自己已经成功获取到了共享锁,同时开始执行读取逻辑
>> 如果有比自己序号小的子节点有写请求,那么等待
3.2) 对于写请求,如果自己不是序号最小的节点,那么等待
4) 接收到Watcher通知后,重复步骤1)
3.释放锁:与排他锁逻辑一致
>> 当前获得锁的客户端发生宕机或异常,那么Zookeeper上这个临时节点就会被删除
>> 正常执行完业务逻辑,客户端主动删除自己创建的临时节点
Zookeeper实现共享锁节点树.png
基于Zookeeper实现共享锁流程.png
羊群效应
在实现共享锁的 "判断读写顺序" 的第1个步骤是:
创建完节点后,获取 /lockpath 节点下的所有子节点,并对该节点注册子节点变更的Watcher监听。
这样的话,任何一次客户端移除共享锁之后,Zookeeper将会发送子节点变更的Watcher通知给所有机器,
系统中将有大量的 "Watcher通知" 和 "子节点列表获取" 这个操作重复执行,
然后所有节点再判断自己是否是序号最小的节点(写请求)或者判断比自己序号小的子节点是否都是读请求(读请求),
从而继续等待下一次通知。
然而,这些重复操作很多都是 "无用的",
实际上每个锁竞争者只需要关注序号比自己小的那个节点是否存在即可。
当集群规模比较大时,这些 "无用的" 操作不仅会对Zookeeper造成巨大的性能影响和网络冲击,
更为严重的是,如果同一时间有多个客户端释放了共享锁,
Zookeeper服务器就会在短时间内向其余客户端发送大量的事件通知--
这就是所谓的 "羊群效应"。
共享锁: 改进后的分布式锁实现
1.客户端调用 create 方法创建一个类似于 /lockpath/[hostname]-请求类型-序号 的临时顺序节点
2.客户端调用 getChildren 方法获取所有已经创建的子节点列表(这里不注册任何Watcher)
3.如果无法获取任何共享锁,那么调用 exist 来对比自己小的那个节点注册Watcher
>> 读请求:向比自己序号小的最后一个写请求节点注册Watcher监听
>> 写请求:向比自己序号小的最后一个节点注册Watcher监听
4.等待Watcher监听,继续进入步骤2
Zookeeper羊群效应改进前后.png
2.3.2 基于Curator客户端实现分布式锁
zk客户端--curator--实现分布式锁.jpg InterProcessLock继承关系图.png#概述
Apache Curator是一个Zookeeper的开源客户端,它提供了Zookeeper各种应用场景
(Recipe,如共享锁服务、master选举、分布式计数器等)的抽象封装等。
#分布式锁
Curator提供的跟分布式锁相关的类有5种,分别是:
>> 1.Shared Reentrant Lock 可重入锁: 可重入锁重入几次需要释放几次
----> InterProcessMutex
>> 2.Shared Lock 共享不可重入锁: 不可重入的锁可能在一些情况导致死锁
----> InterProcessSemaphoreMutex
>> 3.Shared Reentrant Read Write Lock 可重入读写锁
----> InterProcessReadWriteLock & InterProcessMutex
>> 4.Shared Semaphore 信号量 ---->
----> InterProcessSemaphoreV2 - 信号量实现类
----> Lease - 租约(单个信号)
----> SharedCountReader - 计数器,用于计算最大租约数量
>> 5.Multi Shared Lock 多锁
----> InterProcessMultiLock - 对所对象实现类
----> InterProcessLock - 分布式锁接口类
前4种锁都是公平锁...
#关于错误处理
强烈推荐使用ConnectionStateListener处理连接状态的改变。
当连接LOST时你不再拥有锁。
package com.zy;
import com.zy.mapper.GoodsMapper;
import com.zy.zk.CuratorZkClient;
import com.zy.zk.URI;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@RunWith(SpringRunner.class)
@SpringBootTest
@MapperScan("com.zy.mapper")
public class SpringBootDemo01ApplicationTests {
@Autowired
private GoodsMapper goodsMapper;
private static final Integer goodsId = 1;
private static final Integer countMinus = 2;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
private static final String lockPath = "/zkLock";
private static final int num = 4;
private static final CountDownLatch countDownLatch = new CountDownLatch(num);
/**
* 减库存: 这里最终 库存数 会 < 0
*
* @throws InterruptedException
*/
@Test
public void fn01() throws InterruptedException {
// 假定 goodsId=1 的商品, 库存数量是 16, 执行完该未加锁方法后, 库存可能小于 0
// 当然, 这里是模拟集群部署(即多台服务器时)的并发场景
// 此处由于是在本机模拟的(一台机器上, 故未加单机下的JVM同步锁, 如 synchronized, Lock 等)
// 这里模拟 10 台 业务服务器 同时发送减库存的请求
// 每次扣减 2 件商品, 因此数据库中设置商品数量 < 8, 这里设为 5, 观察执行完毕后会不会 < 0
for (int i = 0; i < num; i++) {
executorService.submit(() -> {
// 查库存
Integer count = goodsMapper.getCountByGoodsId(goodsId);
if (count - countMinus >= 0) {
// 减库存
goodsMapper.updateGoodsCountByGoodsId(goodsId, countMinus);
}
});
}
TimeUnit.SECONDS.sleep(1);
}
/**
* 减库存: 这里加分布式锁后, 进程(进程!进程!)之间也是安全的
* 即同一时刻, 所有 JVM 中, 最多只有一个线程可执行减库存操作, 库存数不会 < 0
*
* @throws InterruptedException 控制台打印结果显示: InterProcessMutex 是可重入的
* client#2第一次已获取互斥锁
* client#2第二次获取互斥锁
* client#2第二次互斥锁释放
* client#2第一次互斥锁释放org.apache.curator.framework.recipes.locks.InterProcessMutex@18147eac
* client#1第一次已获取互斥锁
* client#1第二次获取互斥锁
* client#1第二次互斥锁释放
* client#1第一次互斥锁释放org.apache.curator.framework.recipes.locks.InterProcessMutex@5744678d
* client#3第一次已获取互斥锁
* client#3第二次获取互斥锁
* client#3第二次互斥锁释放
* client#3第一次互斥锁释放org.apache.curator.framework.recipes.locks.InterProcessMutex@584df218
* client#0第一次已获取互斥锁
* client#0第二次获取互斥锁
* client#0第二次互斥锁释放
* client#0第一次互斥锁释放org.apache.curator.framework.recipes.locks.InterProcessMutex@410177b3
*/
@Test
public void fn02() throws InterruptedException {
// 这里模拟 提供分布式锁服务的 zk 集群, 下述的 10 台 业务服务器连接的同样的 zk 集群
CuratorZkClient client = new CuratorZkClient(new URI("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", null, null));
// 这里模拟 4 台 业务服务器 同时发送减库存的请求
// 每次扣减 2 件商品, 因此数据库中设置商品数量 < 8, 这里设为 5, 观察执行完毕后会不会 < 0
for (int i = 0; i < num; i++) {
final InterProcessMutex lock = new InterProcessMutex(client.getClient(), lockPath);
String clientNo = "client#" + i;
executorService.submit(() -> {
try {
// 加锁
boolean acquire = lock.acquire(10, TimeUnit.SECONDS);
if (!acquire) {
System.out.println(clientNo + "第一次无法获取互斥锁");
return;
}
System.out.println(clientNo + "第一次已获取互斥锁");
// 先查库存
Integer count = goodsMapper.getCountByGoodsId(goodsId);
if (count - countMinus >= 0) {
// 再减库存
goodsMapper.updateGoodsCountByGoodsId(goodsId, countMinus);
}
boolean againAcquire = lock.acquire(300, TimeUnit.MICROSECONDS);
if (!againAcquire) {
System.out.println(clientNo + "第二次无法获取互斥锁");
return;
}
System.out.println(clientNo + "第二次获取互斥锁");
lock.release();
System.out.println(clientNo + "第二次互斥锁释放");
} catch (Exception e) {
System.out.println(clientNo + e.getMessage());
} finally {
countDownLatch.countDown();
try {
lock.release();
System.out.println(clientNo + "第一次互斥锁释放" + lock);
} catch (Exception e) {
System.out.println(clientNo + e.getMessage() + "---");
}
}
});
}
countDownLatch.await();
}
/**
* 减库存: 这里加分布式锁后, 进程(进程!进程!)之间也是安全的
* 即同一时刻, 所有 JVM 中, 最多只有一个线程可执行减库存操作, 库存数不会 < 0
*
* @throws InterruptedException 控制台打印结果显示: InterProcessSemaphoreMutex 是不可重入的
* client#2第一次已获取互斥锁
* client#2第二次无法获取互斥锁
* client#2第一次互斥锁释放
* client#1第一次已获取互斥锁
* client#1第二次无法获取互斥锁
* client#1第一次互斥锁释放
* client#3第一次已获取互斥锁
* client#3第二次无法获取互斥锁
* client#3第一次互斥锁释放
* client#0第一次无法获取互斥锁
* client#0Not acquired---
*/
@Test
public void fn03() throws InterruptedException {
// 这里模拟 提供分布式锁服务的 zk 集群, 下述的 10 台 业务服务器连接的同样的 zk 集群
CuratorZkClient client = new CuratorZkClient(new URI("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", null, null));
// 这里模拟 4 台 业务服务器 同时发送减库存的请求
// 每次扣减 2 件商品, 因此数据库中设置商品数量 < 8, 这里设为 5, 观察执行完毕后会不会 < 0
for (int i = 0; i < 4; i++) {
final InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client.getClient(), lockPath);
String clientNo = "client#" + i;
executorService.submit(() -> {
try {
// 加锁
boolean acquire = lock.acquire(2, TimeUnit.SECONDS);
if (!acquire) {
System.out.println(clientNo + "第一次无法获取互斥锁");
return;
}
System.out.println(clientNo + "第一次已获取互斥锁");
// 先查库存
Integer count = goodsMapper.getCountByGoodsId(goodsId);
if (count - countMinus >= 0) {
// 再减库存
goodsMapper.updateGoodsCountByGoodsId(goodsId, countMinus);
}
boolean againAcquire = lock.acquire(300, TimeUnit.MICROSECONDS);
if (!againAcquire) {
System.out.println(clientNo + "第二次无法获取互斥锁");
return;
}
System.out.println(clientNo + "第二次获取互斥锁");
lock.release();
System.out.println(clientNo + "第二次互斥锁释放");
} catch (Exception e) {
System.out.println(clientNo + e.getMessage());
} finally {
countDownLatch.countDown();
try {
lock.release();
System.out.println(clientNo + "第一次互斥锁释放");
} catch (Exception e) {
System.out.println(clientNo + e.getMessage() + "---");
}
}
});
}
countDownLatch.await();
}
/**
* 减库存: 这里加分布式锁后, 进程(进程!进程!)之间也是安全的
* 即同一时刻, 所有 JVM 中, 最多只有一个线程可执行减库存操作, 库存数不会 < 0
*
* @throws InterruptedException 控制台打印结果显示如下: (InterProcessReadWriteLock 读写锁, 一波请求, 只有一个能获取写锁)
* client#0已获取写锁
* client#1无法获取写锁
* client#1You do not own the lock: /zkLock-----
* client#2无法获取写锁
* client#2You do not own the lock: /zkLock-----
* client#3无法获取写锁
* client#3You do not own the lock: /zkLock-----
* client#0写锁释放完毕org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock$InternalInterProcessMutex@1c5fbcfe
*/
@Test
public void fn04() throws InterruptedException {
// 这里模拟 提供分布式锁服务的 zk 集群, 下述的 10 台 业务服务器连接的同样的 zk 集群
CuratorZkClient client = new CuratorZkClient(new URI("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", null, null));
// 这里模拟 4 台 业务服务器 同时发送减库存的请求
// 每次扣减 2 件商品, 因此数据库中设置商品数量 < 8, 这里设为 5, 观察执行完毕后会不会 < 0
for (int i = 0; i < 4; i++) {
String clientNo = "client#" + i;
final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client.getClient(), lockPath);
final InterProcessMutex writeLock = lock.writeLock();
final InterProcessMutex readLock = lock.readLock();
executorService.submit(() -> {
try {
// 加锁: 要先加写锁, 再加读锁
boolean writeAcquire = writeLock.acquire(100, TimeUnit.MICROSECONDS);
if (!writeAcquire) {
System.out.println(clientNo + "无法获取写锁");
return;
}
System.out.println(clientNo + "已获取写锁");
/*boolean readAcquire = readLock.acquire(100, TimeUnit.MICROSECONDS);
if (!readAcquire) {
System.out.println(clientNo + "无法获取读锁");
return;
}
System.out.println(clientNo + "已获取读锁");*/
// 先查库存
Integer count = goodsMapper.getCountByGoodsId(goodsId);
if (count - countMinus >= 0) {
// 再减库存
goodsMapper.updateGoodsCountByGoodsId(goodsId, countMinus);
}
} catch (Exception e) {
System.out.println(clientNo + e.getMessage());
} finally {
countDownLatch.countDown();
try {
/*readLock.release();
System.out.println(clientNo + "读锁释放完毕" + readLock);*/
writeLock.release();
System.out.println(clientNo + "写锁释放完毕" + writeLock);
} catch (Exception e) {
System.out.println(clientNo + e.getMessage() + "-----");
}
}
});
}
countDownLatch.await();
}
}
参考资料
https://mp.weixin.qq.com/s/MnbXpKuJWheY5SmnHI8-Ag (分布式锁--非常详细)
http://www.54tianzhisheng.cn/2018/04/24/Distributed_lock/ (分布式锁--较全)
https://blog.csdn.net/lovexiaotaozi/article/details/83819916 (DB实现分布式锁)
https://www.jianshu.com/p/a974eec257e6 (Curator实现分布式锁--已参考)
https://www.jianshu.com/p/d12bf3f4017c (Curator分布式锁源码--已参考)
https://mp.weixin.qq.com/s/gUDMP5FVPfS7Id4IyHp02A (分布式锁)
https://www.cnblogs.com/thisiswhy/p/12499331.html
https://www.cnblogs.com/thisiswhy/p/12596069.html