多线程之 Java 锁的基架 AQS(一)
2019-10-21 本文已影响0人
Unyielding_L
简述
提供一个框架来实现基于先入先出等待队列的阻塞锁和相关同步器(信号量,事件等)
。该类被设计成多种类型同步器的有用基础,这些同步器使用一个简单原子整数值表示状态
。这个简单原子整数是核心,它是用volatile
关键字修饰,改变其値是通过CAS方式去修改的。子类必须定义改变这个状态
的受保护的方法,这些方法的定义意味着被获取或者被释放对象的意义是什么。因此这个类的其他方法实现了所有的排队和阻塞机制。
独占模式和共享模式
独占模式
:当以独占模式获取时,其它线程尝试将无法成功。共享模式
:多个线程尝试时可能(不一定)成功(Semaphore 当许可证
获取完,再有线程尝试获取就不会成功)。这个类不“理解”除了在机械意义上的这些差异之外,当共享模式获取成功时,下一个等待的线程(如果存在的话)也必须确定它是否也可以获取。在不同的模式下,等待的线程共享一个先入先出队列
。通常情况实现类只支持一种模式,但是同样可以支持两种模式比如读写锁
。
如何使用该类
- 定义一个类,为了描述就叫
Mutex
(下文的例子) - 在该类内部定义一个嵌套类继承AQS,为了描述就叫
Sync
- 使用这个
Sync
当做同步器的基础,重新定义下面这些方法,根据实际情况,使用getState
、
setState
和compareAndSetState
来检查和修改同步状态- tryAcquire -
尝试获取独占锁,若锁空闲返回true,否则返回false
- tryRelease -
尝试释放状态来释放锁
- tryAcquireShared -
尝试以共享模式获取。该方法应该查询对象的状态是否允许在共享模式下获取它,如果允许,则应该获取它
- tryReleaseShared-
尝试设置state来释放锁
- isHeldExclusively
是否以独占模式拥有锁
- tryAcquire -
须知
:
这些方法中的每一个都默认抛UnsupportedOperationException异常,这些方法的内部实现必须是内部线程安全的,
并且通常应该是非阻塞的。定义这些方法就是使用这个类唯一支持的方式。
这个类的其他方法都被定义为final的,因为它们不可能被独立的改变。
即使这个类基于一个内部FIFO队列,它也不会自动地执行FIFO获取策略。 互斥同步的核心形式(伪代码):
Acquire:
while (!tryAcquire(arg)) {
如果线程还没有排队,则对其进行排队
可能会阻塞该该线程
}
Release:
if (tryRelease(arg))
取消第一个排队线程的的等待
在AQS里大致也是这么实现的,大家如果熟悉设计模式的话应该看出来这里就用了模板模式,AQS已经实现了骨架,子类只需填空
就行。
实战
这里是一个不可重入的互斥锁类,它使用值0表示解锁状态,使用值1表示锁定状态。 虽然非重入锁并不严格要求记录当前所有者线程,但是这个类这样做是为了更容易监视使用情况。
/**
* 这里是一个不可重入的互斥锁类,它使用值0表示解锁状态,使用值1表示锁定状态。
* 虽然非重入锁并不严格要求记录当前所有者线程,但是这个类这样做是为了更容易监视使用情况。
* @author liangziqiang
* @date 2019.10.19
*/
public class Mutex implements Lock, Serializable {
private final Sync sync = new Sync();
/**
* 继承AQS 当做内部帮助类
*/
private static class Sync extends AbstractQueuedSynchronizer{
/**
* 判断锁是否被占有
* @return 返回true:代表锁被持有,否则返回false
*/
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}
/**
* 尝试获取锁
* 如果state为零 返回true
* @param acquires 只允许传1
*/
@Override
protected boolean tryAcquire(int acquires) {
assert acquires == 1;
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return Boolean.TRUE;
}
return Boolean.FALSE;
}
/**
* 是否锁
* 通过将state设置为零来释放锁
* @param releases 该值为1
* @return
*/
@Override
protected boolean tryRelease(int releases) {
assert releases == 1;
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 创建条件对象
* @return
*/
Condition newCondition() {return new ConditionObject();}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1);}
@Override
public boolean tryLock() { return sync.tryAcquire(1); }
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}
@Override
public void unlock() {sync.release(1);}
@Override
public Condition newCondition() {return sync.newCondition();}
public boolean isHoldTheLock() {return sync.isHeldExclusively();}
}
为了验证上面的正确性,我写了个单元测试一下,代码如下:
public class MutexTest {
private Mutex mutex;
@BeforeEach
public void init() {
mutex = new Mutex();
}
@Test
public void lock() {
mutex.lock();
assertTrue(mutex.isHoldTheLock());
mutex.unlock();
}
@Test
public void lockInterruptibly() {
mutex.lock();
Thread testThread = new Thread(() -> {
try {
mutex.lockInterruptibly();
} catch (InterruptedException e) {
assertFalse(mutex.isHoldTheLock());
System.out.println("abort some");
}
});
testThread.start();
testThread.interrupt();
mutex.unlock();
}
@Test
public void tryLock() {
boolean lock = mutex.tryLock();
Assertions.assertTrue(lock);
mutex.unlock();
}
@Test
public void unlock() {
mutex.lock();
mutex.unlock();
assertFalse(mutex.isHoldTheLock());
}
}
后记
这篇文章只是对AQS 使用方法的介绍,并未深入探索,但是也为读者打开了一道通往AQS 原理的大门,大家说是不是呢。