互联网通用技术我爱编程

分布式锁常见方案

2018-03-09  本文已影响61人  菜鸟小玄

      最近碰到几个业务场景,会遇到并发的问题。在单实例情况下,我们会通过java.util.concurrent包下的API或者synchronized关键字来处理。但是集群部署情况,我们就要利用分布式锁来解决了。

分布式与单机情况下最大的不同在于其不是多线程而是多进程。

多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置。而进程之间甚至可能都不在同一台物理机上,因此需要将标记存储在一个所有进程都能看到的地方。

与单机模式下的锁不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。(我觉得分布式情况下之所以问题变得复杂,主要就是需要考虑到网络的延时和不可靠)

分布式锁的要求:

可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。

这把锁要是一把可重入锁(避免死锁)

这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)

有高可用的获取锁和释放锁功能

获取锁和释放锁的性能要好

这篇文章把我以前用到分布式锁方案和现在调研到的方案做一个总结。

分布式锁常用的方案

基于数据库

基于分布式缓存(redis、memcached、日本人Mikio Hirabayashi 开发的ttserver、公司的tair等等)

基于zookeeper

本人目前只用过基于数据库、zookeeper、redis的分布式锁

再说一句,要基于你的业务场景选择合适方案。

一、基于数据库的分布式锁

1、相关基础知识了解

再谈数据库的分布式锁之前,先了解一下相关基础知识。

(1)事务

事务是不能解决分布式锁要解决的问题的,不过在某些情况下事务也可以解决分布式锁要解决的问题。

事务是一组原子性sql查询语句,被当作一个工作单元。若MySQL对改事务单元内的所有sql语句都正常的执行完,则事务操作视为成功,所有的sql语句才对数据生效,若sql中任意不能执行或出错则事务操作失败,所有对数据的操作则无效(通过回滚恢复数据)。事务有四个属性:

a、原子性:事务被认为不可分的一个工作单元,要么全部正常执行,要么全部不执行。

b、一致性:事务操作对数据库总是从一种一致性的状态转换成另外一种一致性状态。

c、隔离性:一个事务的操作结果在内部一致,可见,而对除自己以外的事务是不可见的。

d、永久性:事务在未提交前数据一般情况下可以回滚恢复数据,一旦提交(commit)数据的改变则变成永久(当然用update肯定还能修改)。

事务的4个隔离级别:

a、Read uncommitted

读未提交,顾名思义,就是一个事务可以读取另一个未提交事务的数据。

实际中不会使用

b、Read committed

读提交,顾名思义,就是一个事务要等另一个事务提交后才能读取数据。

针对当前读,加行锁。存在幻读情况。 

c、Repeatable read

读提交,顾名思义,就是一个事务要等另一个事务提交后才能读取数据。

针对当前读,加行锁,加间隙锁(GAP锁)。所以不存在幻读情况。

d、Serializable

Serializable 是最高的事务隔离级别,在该级别下,事务串行化顺序执行,可以避免脏读、不可重复读与幻读。但是这种事务隔离级别效率低下,比较耗数据库性能,一般不使用。

从MVCC退回到基于锁的并发控制。读加共享锁,写加排它锁,且读写冲突,还会锁定这个范围。并发性能急剧下降,实际中不会使用。

MYSAM 引擎的数据库不支持事务,所以事务最好不要对混合引擎(如INNODB 、MYISAM)操作,若能正常运行且是你想要的最好,否则事务中对非支持事务表的操作是不能回滚恢复的。

(3)MVCC

MVCC是一种多版本并发控制机制。

解决的问题?

大多数的MYSQL事务型存储引擎,如,InnoDB,Falcon以及PBXT都不使用一种简单的行锁机制.事实上,他们都和MVCC–多版本并发控制来一起使用。

大家都应该知道,锁机制可以控制并发操作,但是其系统开销较大,而MVCC可以在大多数情况下代替行级锁,使用MVCC,能降低其系统开销。

MVCC实现分析

MVCC是一种多版本并发控制机制。

解决的问题?

大多数的MYSQL事务型存储引擎,如,InnoDB,Falcon以及PBXT都不使用一种简单的行锁机制.事实上,他们都和MVCC–多版本并发控制来一起使用。

大家都应该知道,锁机制可以控制并发操作,但是其系统开销较大,而MVCC可以在大多数情况下代替行级锁,使用MVCC,能降低其系统开销。

MVCC是通过保存数据在某个时间点的快照来实现的,不同存储引擎的MVCC实现是不同的,典型的有乐观并发控制和悲观并发控制。

MVCC的最大好处是读不加锁,读写不冲突。这样在读多写少的应用中,极大的提高了并发性能。

MVCC中读又分为快照读和当前读。快照度顾名思义是读的快照版本,有可能是历史版本,不需要加锁。当前读是读取的当前版本,当前读返回的记录需要加锁,以保证其他事务不会并发修改。 

那么什时候是当前读,什么时候是快照读? 

快照读:简单的select语句。 

select * from table where 不加锁 

当前读:特殊的select语句。 

select * from table where .. for update X锁(排它锁) 

select * from table where .. lock in share mode S锁(共享锁) 

增删改操作。 

insert into table values (…) X锁 

update table set .. where .. X锁 

delete from table where .. X锁

下面我们通过InnoDB的MVCC实现来分析MVCC使怎样进行并发控制的. 

InnoDB的MVCC,是通过在每行记录后面保存两个隐藏的列来实现的,这两个列,分别保存了这个行的创建时间(create_time)和行的删除时间(delete_time)。这里存储的并不是实际的时间值,而是系统版本号(可以理解为事务的ID),每开始一个新的事务,系统版本号就会自动递增,事务开始时刻的系统版本号会作为事务的ID。下面看一下在REPEATABLE READ隔离级别下,MVCC具体是如何操作的。

假设有如下表test_mvcc:

idnamecreate_timedelete_time

1aaa1undefined

2bbb1undefined

3ccc1undefined

create_time和detete_time是隐藏的,通过select语句并看不到。

这里假设系统的版本号从1开始。(已经插入的3条数据,在一个事务中完成的)

初始状态:delete_time(删除版本)是未定义的,既没有被删除过。

select语句,InnoDB会根据以下两个条件检查每行记录:

a.InnoDB只会查找版本早于当前事务版本的数据行(也就是,行的系统版本号小于或等于事务的系统版本号),这样可以确保事务读取的行,要么是在事务开始前已经存在的,要么是事务自身插入或者修改过的。

b.行的删除版本要么未定义,要么大于当前事务版本号,这可以确保事务读取到的行,在事务开始之前未被删除.。

只有a,b同时满足的记录,才能返回作为查询结果。

delete语句,InnoDB会为删除的每一行保存当前系统的版本号(事务的ID)作为删除标识。

下面开始具体分析:

第二个事务,ID为2;

start transaction;

select * from test_mvcc;  //(1)

select * from test_mvcc;  //(2)

commit;

假设1

假设在执行这个事务ID为2的过程中,刚执行到(1),这时,有另一个事务ID为3往这个表里插入了一条数据; 

第三个事务ID为3;

start transaction;

insert into test_mvcc values(NULL,'ddd');//主键是自增的,所以用NULL

commit;

这时表中的数据如下:

idnamecreate_timedelete_time

1aaa1undefined

2bbb1undefined

3ccc1undefined

4ddd3undefined

然后接着执行事务2中的(2),由于id=4的数据的创建时间(事务ID为3),执行当前事务的ID为2,而InnoDB只会查找事务ID小于等于当前事务ID的数据行,所以id=4的数据行并不会在执行事务2中的(2)被检索出来,在事务2中的两条select 语句检索出来的数据都只会是下表:

idnamecreate_timedelete_time

1aaa1undefined

2bbb1undefined

3ccc1undefined

假设2

假设在执行事务ID为2的过程中,刚执行到(1),事务3执行完后,接着又执行了事务4; 

第四个事务:

start  transaction; 

delete from test_mvcc where id=1;

commit;

此时数据库中的表如下:

idnamecreate_timedelete_time

1aaa14

2bbb1undefined

3ccc1undefined

4ddd3undefined

接着执行事务ID为2的事务(2),根据SELECT 检索条件可以知道,它会检索创建时间(创建事务的ID)小于当前事务ID的行和删除时间(删除事务的ID)大于当前事务的行,而id=4的行上面已经说过,而id=1的行由于删除时间(删除事务的ID)大于当前事务的ID,所以事务2的(2)select * from test_mvcc也会把id=1的数据检索出来。所以事务2中的两条select 语句检索出来的数据都如下:

idnamecreate_timedelete_time

1aaa14

2bbb1undefined

3ccc1undefined

假设3

假设在执行完事务2的(1)后又执行,其它用户执行了事务3、4,这时又有一个用户对这张表执行了update操作: 

InnoDB执行update,实际上是新插入了一行记录,并保存其创建时间为当前事务的ID,同时保存当前事务ID到要update的行的删除时间。

第5个事务:

start  transaction;

update test_mvcc set name='eee' where id=2;

commit;

根据update的更新原则:会生成新的一行,并在原来要修改的列的删除时间列上添加本事务ID,得到表如下:

idnamecreate_timedelete_time

1aaa14

2bbb15

3ccc1undefined

4ddd3undefined

2eee5undefined

继续执行事务2的(2),根据select 语句的检索条件,得到下表:

idnamecreate_timedelete_time

1aaa14

2bbb15

3ccc1undefined

到此整个MVCC的实现分析就完了。

(3)相关锁

写锁:又称排他锁、X锁。若事务T对数据对象A加上X锁,事务T可以读A也可以修改A,其他事务不能再对A加任何锁,直到T释放A上的锁。这保证了其他事务在T释放A上的锁之前不能再读取和修改A。

读锁:也叫共享锁、S锁,若事务T对数据对象A加上S锁,则事务T可以读A但不能修改A,其他事务只能再对A加S锁,而不能加X锁,直到T释放A上的S 锁。这保证了其他事务可以读A,但在T释放A上的S锁之前不能对A做任何修改。

表锁:mysql大多数存储引擎都支持,是系统开销最低但并发性最低的一个锁策略。事务t对整个表加读锁,则其他事务可读不可写,若加写锁,则其他事务增删改都不行。

行级锁:又叫记录锁,操作对象是表中的一行,是MVCC技术用的比较多的。行级锁是在存储引擎中实现的,而不是在mysql服务器。行级锁对系统开销较大,处理高并发较好。我们常用的存储引擎innodb是支持行级锁的。

间隙锁:编程的思想源于生活,生活中的例子能帮助我们更好的理解一些编程中的思想。

生活中排队的场景,小明,小红,小花三个人依次站成一排,此时,如何让新来的小刚不能站在小红旁边,这时候只要将小红和她前面的小明之间的空隙封锁,将小红和她后面的小花之间的空隙封锁,那么小刚就不能站到小红的旁边。

这里的小红,小明,小花,小刚就是数据库的一条条记录。

他们之间的空隙也就是间隙,而封锁他们之间距离的锁,叫做间隙锁。

next-key锁:next-key锁其实包含了记录锁和间隙锁,即锁定一个范围,并且锁定记录本身,InnoDB默认加锁方式是next-key 锁。

(4)mysql中几个重要的log

binlog、redo log、undo log,其中redo和undo是跟事务紧密相关的。

binlog

binlog日志用于记录所有更新且提交了数据或者已经潜在更新提交了数据(例如,没有匹配任何行的一个DELETE)的所有语句。语句以“事件”的形式保存,它描述数据更改。

binlog是MySQL Server层记录的日志, redo log是InnoDB存储引擎层的日志。 两者都是记录了某些操作的日志,自然有些重复,但两者记录的格式不同。

-- binlog的作用:

a、恢复使能够最大可能地更新数据库,因为二进制日志包含备份后进行的所有更新。

b、在主复制服务器上记录所有将发送给从服务器的语句

undo log

Undo Log是为了实现事务的原子性,在MySQL数据库InnoDB存储引擎中,还用UndoLog来实现多版本并发控制(简称:MVCC)。

-- 原理:

Undo Log的原理很简单,为了满足事务的原子性,在操作任何数据之前,首先将数据备份到一个地方(这个存储数据备份的地方称为UndoLo)。

然后进行数据的修改。如果出现了错误或者用户执行了ROLLBACK语句,系统可以利用UndoLog中的备份将数据恢复到事务开始之前的状态。

除了可以保证事务的原子性,Undo Log也可以用来辅助完成事务的持久化。

-- 用Undo Log实现原子性和持久化的事务的简化过程:

假设有A、B两个数据,值分别为1,2。

A.事务开始.

B.记录A=1到undolog.

C.修改A=3.

D.记录B=2到undolog.

E.修改B=4.

F.将undolog写到磁盘。

G.将数据写到磁盘。

H.事务提交

这里有一个隐含的前提条件:数据都是先读到内存中,然后修改内存中的数据,最后将数据写回磁盘。

之所以能同时保证原子性和持久化,是因为以下特点:

A.更新数据前记录Undo log。

B.为了保证持久性,必须将数据在事务提交前写到磁盘。只要事务成功提交,数据必然已经持久化。

C.Undo log必须先于数据持久化到磁盘。如果在G,H之间系统崩溃,undo log是完整的,可以用来回滚事务。

D.如果在A-F之间系统崩溃,因为数据没有持久化到磁盘。所以磁盘上的数据还是保持在事务开始前的状态。

缺陷:每个事务提交前将数据和Undo Log写入磁盘,这样会导致大量的磁盘IO,因此性能很低。

如果能够将数据缓存一段时间,就能减少IO提高性能。但是这样就会丧失事务的持久性。因此引入了另外一种机制来实现持久化,即redo log

redo log

记录的是新数据的备份。在事务提交前,只要将Redo Log持久化即可,不需要将数据持久化。当系统崩溃时,虽然数据没有持久化,但是RedoLog已经持久化。系统可以根据RedoLog的内容,将所有数据恢复到最新的状态。

-- undo+redo联合后,事务的简化过程:

假设有A、B两个数据,值分别为1,2.

A.事务开始.

B.记录A=1到undolog.

C.修改A=3.

D.记录A=3到redolog.

E.记录B=2到undolog.

F.修改B=4.

G.记录B=4到redolog.

H.将redolog写入磁盘。

I.事务提交

-- undo+redo联合后,事务的特点:

A.为了保证持久性,必须在事务提交前将redoLog持久化。

B.数据不需要在事务提交前写入磁盘,而是缓存在内存中。

C.RedoLog保证事务的持久性。

D.UndoLog保证事务的原子性。

E.有一个隐含的特点,数据必须要晚于redolog写入持久存

2、非常规方案实现分布式锁

(1)for update方案

start transaction;

select id,name,application_count 

from campaign

Where id=1 for update;//X锁

...

...

...

执行一大堆业务逻辑;

...

...

update campaign 

Set application_count =application_count -1

Where id=1;

commit;

(2)类似乐观锁的方案

start transaction;

select id,name,application_count 

from campaign

Where id=1;

记录application_count到变量a中

...

...

...

执行一大堆业务逻辑;

...

...

update campaign 

Set application_count =application_count -1

Where id=1 and application_count=#{a};

commit;

3、乐观锁实现分布式锁

(1)定义

乐观的认为没有人使用对应的资源。

乐观锁是基于数据的版本号实现的。

在创建数据表的时候,给表增加一个字段version,每次读取的时候,把version读取出来,更新的时候,比较version是否一致,并把version加1。

有的时候,不用增加version字段,通过某个业务字段也可以做到。

但是如果不用version字段,有可能出现ABA的问题,如果你的业务字段不会出现这种情况或者业务场景允许出现ABA的问题,那就没有必要增加version字段。

(2)简单例子

假如有如下一张表user:

idnameageversion

1xxx201

2yyy201

更新user的业务场景如下: 你需要先从user表把数据查询出来,然后做一系列复杂的操作,最后更新对应的记录

查询:select id,name,age,version from user where id=1;

更新:update set name='zwf',version=2 where id=1 and version=1;如果发现version已经不是1了,说明已经有其他的事务进行了更新,id=1的这条记录并不会因为并发而被修改。

假如你的业务场景跟例子中的是一样的,而且也不需要增加version字段就可以实现乐观锁,那么用这种方式是最简单的。(不需要专门生成一个资源表,来映射user表中的每条记录,把记录当成资源;不用增加version字段,对user表的侵入性为0)

(3)通用实现方式,生成资源表

资源表定义

idresourcegmt_creategmt_modifystatusversion

112017-08-08 11:11:112017-08-08 11:11:11210

222017-08-08 11:11:112017-08-08 11:11:11111

resource:代表一个资源,具体代表什么看你的业务场景了(比如某个类中的某个方法、比如上面提到的user表中的一条记录)。

status:资源是否被锁定,1代表未被锁定,2代表锁定。

执行流程 a、先执行select操作查询当前数据的数据版本号,比如当前数据版本号是11:

select id, resource, state,version from resource where state=1 and id=2;

b、执行更新操作:

update resoure set state=2, version=12, update_time=now() where resource=2 and state=1 and version=11

c、如果上述update语句真正更新影响到了一行数据,那就说明占位成功。如果没有更新影响到一行数据,则说明这个资源已经被别人占位了。

d、如果已经占位成功,想要再次进入,可以通过select * from where resource=2 and state=2 and version=12,有数据代表可以进入。

(3)缺点

a、单点风险,一旦数据库挂掉,会导致业务系统不可用

b、这种操作方式,使原本一次的update操作,必须变为2次操作: select版本号一次;update一次。增加了数据库操作的次数。

c、如果业务场景中的一次业务流程中,多个资源都需要用保证数据一致性,那么如果全部使用基于数据库资源表的乐观锁,在高并发的要求下,对数据库连接的开销一定是无法忍受的。

d、乐观锁机制往往基于系统中的数据存储逻辑,因此可能会造成脏数据被更新到数据库中。在系统设计阶段,我们应该充分考虑到这些情况出现的可能性,并进行相应调整,如将乐观锁策略在数据库存储过程中实现,对外只开放基于此存储过程的数据更新途径,而不是将数据库表直接对外公开。

e、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。(可以通过定时任务来解决)

f、这把锁只能是非阻塞的,没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。

5、排他锁(悲观锁)实现分布式锁

(1)定义

悲观锁是与乐观锁对应的,悲观的认为资源已经被别人抢占了。

(2)实现方式

首先创建一张数据表:

CREATE TABLE `methodLock` (

  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',

  `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',

  `desc` varchar(1024) NOT NULL DEFAULT '备注信息',

  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',

  PRIMARY KEY (`id`),

  UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE

) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

方式一

当我们想要锁住某个方法时,执行以下SQL:

insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)

因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。

当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:

delete from methodLock where method_name ='method_name'

上面这种简单的实现有以下几个问题:

单点,一旦数据库挂掉,会导致业务系统不可用(解决:搞两个数据库,数据之前双向同步,一旦挂掉快速切换到备库上)。

这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁(解决:定时任务)。

这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作(解决:while循环,直到insert成功再返回成功)。

这把锁是不可重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了(解决:在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了)。

方式二

X锁实现: for update

我们还用刚刚创建的那张数据库表。可以通过数据库的排他锁来实现分布式锁。

基于MySql的InnoDB引擎,可以使用以下方法来实现加锁操作:

start transaction

select * from methodLock where method_name=xxx for update;

在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。(InnoDB引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。这里我们希望使用行级锁,就要给method_name添加索引,值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。重载方法的话建议把参数类型也加上)

解锁:commit transaction

使用这种方式可以有效的解决上面提到的无法释放锁和阻塞锁的问题。

阻塞锁? for update语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。

锁定之后服务宕机,无法释放?使用这种方式,服务宕机之后数据库会自己把锁释放掉。

但是还是无法直接解决数据库单点和可重入问题。

这里还可能存在另外一个问题,虽然我们对method_name 使用了唯一索引,并且显示使用for update来使用行级锁。但是,MySql会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。如果发生这种情况就悲剧了。。。

还有一个问题,就是我们要使用排他锁来进行分布式锁的lock,那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆。

4、数据库实现分布式锁-总结

优点:

简单、容易理解

缺点:

在高并发的时候,数据库连接数会不够用,性能上限很容易触及

处理阻塞、单点、锁超时等问题,会使方案非常复杂。

二、基于Zookeeper实现分布式锁

1、zookeeper简单介绍

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

https://zookeeper.apache.org/

2、实现原理

基于zookeeper临时有序节点可以实现的分布式锁。

大致思想即为:每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。

判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。

当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。

3、实现方式

可以直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务(你也可以自己封装)。

public boolean tryLock(long timeout, TimeUnit unit) throws Exception {

        return interProcessMutex.acquire(timeout, unit);

}

public void unlock() throws Exception {

        interProcessMutex.release();

}

Curator提供的InterProcessMutex是分布式锁的实现。acquire方法用户获取锁,release方法用于释放锁。

使用ZK实现的分布式锁好像完全符合了本文开头我们对一个分布式锁的所有期望。但是,其实并不是,Zookeeper实现的分布式锁其实存在一个缺点,那就是性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。

其实,使用Zookeeper也有可能带来并发问题,只是并不常见而已。考虑这样的情况,由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。就可能产生并发问题。这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点。(所以,选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡。)

4、总结

优点:

锁无法释放问题解决,使用Zookeeper可以有效的解决锁无法释放的问题,因为在创建锁的时候,客户端会在ZK中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。

非阻塞锁问题解决,使用Zookeeper可以实现阻塞的锁,客户端可以通过在ZK中创建顺序节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。

不可重入问题解决,使用Zookeeper也可以有效的解决不可重入的问题,客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。

单点问题解决,使用Zookeeper可以有效的解决单点问题,ZK是集群部署的,只要集群中有半数以上的机器存活,就可以对外提供服务。

实现简单

缺点,性能不如缓存高

三、基于Tair实现分布式锁

1、tair简单介绍

https://www.atatech.org/articles/4743

2、实现

参考:https://www.atatech.org/articles/30653

本人没有在实际环境中使用过。

import com.taobao.tair.DataEntry;

import com.taobao.tair.Result;

import com.taobao.tair.ResultCode;

import com.taobao.tair.TairManager;

import org.apache.commons.lang.NotImplementedException;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.slf4j.helpers.FormattingTuple;

import org.slf4j.helpers.MessageFormatter;

import javax.annotation.Resource;

import java.net.InetAddress;

import java.net.UnknownHostException;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

public class CommonLocker {

    private static final Logger logger = LoggerFactory.getLogger(CommonLocker.class);

    @Resource

    private TairManager ldbTairManager;

    private static final short NAMESPACE = 1310;

    private static CommonLocker locker;

    public void init() {

        if (locker != null) return;

        synchronized (CommonLocker.class) {

            if (locker == null)

                locker = this;

        }

    }

    public static Lock newLock(String format, Object... argArray) {

        FormattingTuple ft = MessageFormatter.arrayFormat(format, argArray);

        return newLock(ft.getMessage());

    }

    public static Lock newLock(String strKey) {

        String key = "_tl_" + strKey;

        return new TairLock(key, CommonConfig.lock_default_timeout);

    }

    public static Lock newLock(String strKey, int timeout) {

        String key = "_tl_" + strKey;

        return new TairLock(key, timeout);

    }

    private static class TairLock implements Lock {

        private String lockKey;

        private boolean gotLock = false;

        private int retryGet = 0;

        private int retryPut = 0;

        private int timeout;

        public TairLock(String key, int timeout) {

            this.lockKey = tokey(key);

            this.timeout = timeout;

        }

        public boolean tryLock() {

            return tryLock(timeout);

        }

        /**

        * need finally do unlock

        *

        * @return

        */

        public boolean tryLock(int timeout) {

            Result result = locker.ldbTairManager.get(NAMESPACE, lockKey);

            while (retryGet++ < CommonConfig.lock_get_max_retry &&

                    (result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc()))) // 重试一次

                result = locker.ldbTairManager.get(NAMESPACE, lockKey);

            if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // lock is free

                // 已验证version 2表示为空,若不是为空,则返回version error

                ResultCode code = locker.ldbTairManager.put(NAMESPACE, lockKey, locker.getValue(), 2, timeout);

                if (ResultCode.SUCCESS.equals(code)) {

                    gotLock = true;

                    return true;

                } else if (retryPut++ < CommonConfig.lock_put_max_retry &&

                        (code == null || ResultCode.CONNERROR.equals(code) || ResultCode.TIMEOUT.equals(code) || ResultCode.UNKNOW.equals(code))) { // 感谢剑痴指出错误

                    return tryLock(timeout);

                }

            } else if (result.getValue() != null && locker.getValue().equals(result.getValue().getValue())) {

// 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。

                // 若是自己的锁,自己继续用

                gotLock = true;

                return true;

            }

            // 到这里表示没有拿到锁

            return false;

        }

        public void unlock() {

            if (gotLock) {

                ResultCode invalidCode = locker.ldbTairManager.invalid(NAMESPACE, lockKey);

                gotLock = false;

            }

        }

        public void lock() {

            throw new NotImplementedException();

        }

        public void lockInterruptibly() throws InterruptedException {

            throw new NotImplementedException();

        }

        public boolean tryLock(long l, TimeUnit timeUnit) throws InterruptedException {

            throw new NotImplementedException();

        }

        public Condition newCondition() {

            throw new NotImplementedException();

        }

    }

// 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。

    private String getValue() {

        return getHostname() + ":" + Thread.currentThread().getName();

    }

    /**

    * 获得机器名

    *

    * @return

    */

    public static String getHostname() {

        try {

            return InetAddress.getLocalHost().getHostName();

        } catch (UnknownHostException e) {

            return "[unknown]";

        }

    }

    public void setLdbTairManager(TairManager ldbTairManager) {

        this.ldbTairManager = ldbTairManager;

    }

}

使用样例

Lock lockA = CommonLocker.newLock("hs_room_{}_uid_{}", roomDo.getUuid(), roomDo.getMaster().getUid());

Lock lockB = CommonLocker.newLock("hs_room_{}_uid_{}", roomDo.getUuid(), roomDo.getPartnerList().get(0).getUid());

try {

    if (lockA.tryLock() && lockB.tryLock()) {// 分布式锁定本任务

        // do something....

    }

} finally {

    lockB.unlock();

    lockA.unlock();

}

3、总结

整体上用到了tair的get、put、invalid三个方法,如果put失败或get失败,重试即可(根据业务场景来判断重试几次,不建议重试太多次,容易造成雪崩)

优点:

锁无法释放问题解决,通过tair的超时问题机制解决,不过超时时间需要根据业务场景来判断

可重入问题解决,让tair的value包含机器ip+线程name,获取锁的时候,先get value做检查是不是已经获取了锁

非阻塞问题解决(while重复执行)

高性能、高可用

缺点:通过时间控制失效不太靠谱

有写的不对的地方,欢迎大家拍砖

分布式与单机情况下最大的不同在于其不是多线程而是多进程

事务是不能解决分布式锁要解决的问题的,不过在某些情况下事务也可以解决分布式锁要解决的问题。

上一篇下一篇

猜你喜欢

热点阅读