信号量-semaphore -05
并发中多个线程访问同一个资源可以通过同步解决,但是多个线程访问多个资源怎么解决呢?如公交车上有10个位置,但是现在有15个人,要控制这15个人来抢占位置。juc 引入semaphore 来解决这个问题。
/**
* 信号量:控制多线程对多个资源的访问
*/
public class SemaphoreDemo {
//设置10个许可,这个和线程池不一样,信号量里面并没有创建线程,而是通过 所谓的许可证(可理解为 token )来控制
// 拿到 token 的就可以访问资源,到 token 被拿完了,就会被放入等待队列
private static final Semaphore semaphore = new Semaphore(10);
public static void main(String[] args) throws InterruptedException {
for (int i=0;i<20;i++) {
new Thread(()->{
try {
semaphore.acquire();//获取执行许可
System.out.println("执行线程=" + Thread.currentThread().getName());
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();//释放
}
}).start();
}
}
}
jkaj4kz8mm.png
常用api
int availablePermits = semaphore.availablePermits();//可用 token
int drainPermits = semaphore.drainPermits();//获得并返回所有立即可用的许可证数量
int queueLength = semaphore.getQueueLength();//返回当前可能在阻塞获取许可证线程的数量
boolean hasQueuedThreads = semaphore.hasQueuedThreads();//查询是否有线程正在等待获取许可证
boolean fair = semaphore.isFair();//返回是否为公平模式
boolean tryAcquire = semaphore.tryAcquire();//非阻塞的获取一个许可证(token)
下面我们来稍微分析一下semaphore 的源码 。
//构造器,传入token 数量
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//构造器,传入token 数量,是否是公平锁、否公平锁
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
- NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
//父类构造器
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
- Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 构造器
Sync(int permits) {
// 使用 state 字段来作为 token 数量,在往上面就是 AQS ,暂时到这里。
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
通过上面的源码分析,我们看出 在我们 private static final Semaphore semaphore = new Semaphore(10); 时,设置了一个 state 字段来记录 token 的数量,也就时许可证的数量。下面我们来分析一下 semaphore.acquire(); 这个方法的实现。也就是现在我们的token 有了,就等 线程来取了, semaphore.acquire(); 就时获取token。
- semaphore.acquire()
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
/** All mechanics via AbstractQueuedSynchronizer subclass */
private final Sync sync;
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public void acquire() throws InterruptedException {
// 进去,该方法在 AQS
sync.acquireSharedInterruptibly(1);
}
}
- AQS
// arg == 1
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//判断当前线程是否中断
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)// tryAcquireShared(arg) 看一下这个方法
doAcquireSharedInterruptibly(arg);
}
- tryAcquireShared
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 因为前面我们初始化时选择的 是NonfairSync ,所以具体实现在这里 acquires = 1
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);// 看一下这个方法
}
}
- nonfairTryAcquireShared
// Semaphore 内部类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState(); // 我们初始化时 ,这个值 是10
}
// 这个方法就 线程来 获取token 的逻辑 acquires == 1
final int nonfairTryAcquireShared(int acquires) {
for (;;) {// 自旋
int available = getState(); // 如果是第一个线程来,这个值就是初始值,否则就是 剩下的 token 数量。
// 总的数量 - 1 ,每个 线程来都取走一个
int remaining = available - acquires;
// 这里出 for 有两个条件,remaining < 0 表示 token 数量没有了,
//试想一下,初始化 10 个token ,共有15 个 ,每来一个线程 就 减1,第10 个的时候 remaining = 0 ,
//这时刚好把token 用完,第11 个来时 就变成了 0-1 = -1 ,remaining = -1 ,并返回。
//在没有取完token 时,remaining < 0 这个条件不会满足。
if (remaining < 0 ||
compareAndSetState(available, remaining))// 第二个条件,CAS ,也就每次取走一个token ,就更新 state 的值,也就说剩下的token 数量。返回的是 10-1 = 9 (剩下的数量)
return remaining;
}
}
- compareAndSetState
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this, 这句就调用的 unsafe 对象 ,本地方法栈里面的。
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
- unsafe.compareAndSwapInt(this, stateOffset, expect, update);
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
unsafe.compareAndSwapInt(this, stateOffset, expect, update);
this :需要更新字段所在的对象
stateOffset : 字段 --> state
expect : 期望值
update : 更新值
更新之前都会 比较 传入的 expect 值是否和原来的值 相同,相同就 更新。如 当前 state =10 ,expect = 10,update =9 ,则 state = 9。
到这里我们就指定了 在 semaphore 信号量中,上层 semaphore.acquire(); 方法就是将底层 AQS 中的 state 值做修改,而所谓的 获取 执行许可(token )就是将 state 的值 -1 ,直到 取完为止。那么 如果 state = 10 ,但是现在有 15 个线程需要取,剩下5个没有获取成功的 线程又去哪里了呢?下面分析下这个逻辑:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)// tryAcquireShared(arg) 这个方法关键。
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared(arg) : arg = 1 。前面我们说到这个方法 nonfairTryAcquireShared 里面是一个 自旋(for(;;)),里面有个if 条件判断,有两种 条件可以进去,下面我们再来看一看代码:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {// 自旋
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || // 条件 1
compareAndSetState(available, remaining))
return remaining;
}
}
remaining < 0 。分析一下这个条件 什么时候满足:
试想 : state = 10 ,总共有15 个线程需要获取 许可(token),
当前面 10 都获取 成功后 state = 0,这是第11 个 过来了,
int available = getState(); // available = state = 0
int remaining = available - acquires; // 0-1 , remaining = -1 < 0。
将 return -1。
回到 :
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 第11 个线程 满足 < 0 的条件。
doAcquireSharedInterruptibly(arg);
}
- doAcquireSharedInterruptibly(arg); arg = 1
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// addWaiter() 在队列里面 添加一个节点。新加入的节点再队尾,返回 的是 上一个节点,
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {// 自旋,依次 去队列里面的前一个,
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); // 再一次尝试 获取
if (r >= 0) { //获取成功,节点被剔除队列。
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
final Node node = addWaiter(Node.SHARED); 在 队列里面添加一个节点,也就说 没有获取到 许可(token) 的线程 被 包装在一个 Node 对象里面,并 添加到了一个队列里面。
- addWaiter(Node node)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
到这里我们就知道了,semaphore 中 获取 token 就是每次 将 state 的值 -1 ,当 state 的值 >=0 时 表示 线程有获取资源的权力,当 state 被消耗完了,下一个线程再来的时候,就会被放到一个队列里面,等待下次获取。
- enq(node);
// 入队操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;// 队尾
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; // 当node 的 前一个 节点 设置为 t
if (compareAndSetTail(t, node)) { //设置 当前node 为 队尾
t.next = node; // 以前的 队尾 的下一个 节点 设置为 当前节点,当前节点 就成了对尾
return t; // Node t = tail;// 队尾
}
}
}
}