Semaphore 源码分析
需要提前了解的知识点: AbstractQueuedSynchronizer 实现原理
类介绍
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。比如控制用户的访问量,同一时刻只允许1000个用户同时使用系统,如果超过1000个并发,则需要等待。
使用场景
比如模拟一个停车场停车信号,假设停车场只有两个车位,一开始两个车位都是空的。这时如果同时来了两辆车,看门人允许它们进入停车场,然后放下车拦。以后来的车必须在入口等待,直到停车场中有车辆离开。这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复。
public class SemaphoreDemo {
private static Semaphore s = new Semaphore(2);
public static void main(String[] args) {
ExecutorService pool = Executors.newCachedThreadPool();
pool.submit(new ParkTask("1"));
pool.submit(new ParkTask("2"));
pool.submit(new ParkTask("3"));
pool.submit(new ParkTask("4"));
pool.submit(new ParkTask("5"));
pool.submit(new ParkTask("6"));
pool.shutdown();
}
static class ParkTask implements Runnable {
private String name;
public ParkTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
s.acquire();
System.out.println("Thread "+this.name+" start...");
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s.release();
}
}
}
}
Semaphore 源码分析
Semaphore 通过使用内部类Sync继承AQS来实现。
支持公平锁和非公平锁。内部使用的AQS的共享锁。
具体实现可参考 AbstractQueuedSynchronizer 源码分析
Semaphore 的结构如下:
![](https://img.haomeiwen.com/i2843224/c7695d1692a9a0d7.png)
Semaphore构造
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
构造方法指定信号量的许可数量,默认采用的是非公平锁,也只可以指定为公平锁。
permits赋值给AQS中的state变量。
acquire:可响应中断的获得信号量
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
获得信号量方法,这两个方法支持 Interrupt中断机制,可使用acquire() 方法每次获取一个信号量,也可以使用acquire(int permits) 方法获取指定数量的信号量 。
acquire:不可响应中断的获取信号量
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
这两个方法不响应Interrupt中断机制,其它功能同acquire方法机制。
tryAcquire 方法,尝试获得信号量
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
尝试获得信号量有三个方法。
- 尝试获取信号量,如果获取成功则返回true,否则马上返回false,不会阻塞当前线程。
- 尝试获取信号量,如果在指定的时间内获得信号量,则返回true,否则返回false
- 尝试获取指定数量的信号量,如果在指定的时间内获得信号量,则返回true,否则返回false。
release 释放信号量
public void release() {
sync.releaseShared(1);
}
调用AQS中的releaseShared方法,使得state每次减一来控制信号量。
availablePermits方法,获取当前剩余的信号量数量
public int availablePermits() {
return sync.getPermits();
}
//=========Sync类========
final int getPermits() {
return getState();
}
该方法返回AQS中state变量的值,当前剩余的信号量个数
drainPermits方法
public int drainPermits() {
return sync.drainPermits();
}
//=========Sync类========
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
获取并返回立即可用的所有许可。Sync类的drainPermits方法,获取1个信号量后将可用的信号量个数置为0。例如总共有10个信号量,已经使用了5个,再调用drainPermits方法后,可以获得一个信号量,剩余4个信号量就消失了,总共可用的信号量就变成6个了。
reducePermits 方法
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
//=========Sync类========
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
该方法是protected 方法,减少信号量个数
判断AQS等待队列中是否还有Node
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
//=========AbstractQueuedSynchronizer类========
public final boolean hasQueuedThreads() {
//头结点不等于尾节点就说明链表中还有元素
return head != tail;
}
getQueuedThreads方法
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
//=========AbstractQueuedSynchronizer类========
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
该方法获取AQS中等待队列中所有未获取信号量的线程相关的信息(等待获取信号量的线程相关信息)。
想了解更多精彩内容请关注我的公众号
![](https://img.haomeiwen.com/i2843224/af50fe51e979ebd0.png)