Semaphore
Semaphore概念
semaphore,又名信号量,类似于“许可证”的概念,它实际上是维护了一些“许可证”,用来控制同时允许共享资源的最大线程数。比如疫情期间的图书馆,每天只允许一定数量的人进入,其他人来了之后就需要等待。
应用场景
Semaphore用作流量控制,特别是资源有限的情况下。常用来举例的一个场景是说,数据库的连接,比如我们要读取的数据量比较大,启动几十个线程并发读取,但是由于数据库连接数只有10个,所以这个时候需要Semaphore来控制最多10个线程来请求数据库。
代码示例
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors
.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
虽然有30个线程在执行,但是只允许10个并发的执行。Semaphore的构造方法Semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()获取一个许可证,使用完之后调用release()归还许可证。还可以用tryAcquire()方法尝试获取许可证。
Semaphore还提供一些其他方法
int availablePermits() :返回此信号量中当前可用的许可证数。
int getQueueLength():返回正在等待获取许可证的线程数。
boolean hasQueuedThreads() :是否有线程正在等待获取许可证。
void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。
Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。
Semaphore原理
Semaphore底层也是基于AQS分别实现了公平与非公平策略,需注意semaphore的锁是共享锁。
Semaphore源码分析
semaphore的两种构造函数
获取锁
- 接受一个许可数量的构造,默认是非公平
/**
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
- 两个参数,其中一个是“许可证”数量,true代表公平策略
/**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
构造一个公平策略Semaphore,state设置为允许的最大许可量
//FairSync类
FairSync(int permits) {
super(permits);
}
//Sync类(FairSync类的父类)
Sync(int permits) {
setState(permits);
}
Semaphore公平策略源码解读
当线程调用acquire()方法时,入参"1"表示尝试获取1个许可
/**
* Acquires a permit from this semaphore, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for a permit,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly,线程中断,抛中断异常;尝试获取锁小于0,代表获取失败,加入等待队列
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//获取失败,加入等待队列
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared尝试获取锁,先判断队列中是否存在比当前线程等待时间长的线程;available获取可用的许可证数;remaining申请acquires数量后,剩余的可用许可证数;
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())//先判断队列中是否存在比当前线程等待时间长的线程
return -1;
int available = getState();//获取可用的许可证数
int remaining = available - acquires;//申请acquires数量后,剩余的可用许可证数
//两种情况,1,是可用许可证没有了,那么返回剩余许可证数量;2,是许可证还有,CAS尝试更新成功后,返回剩余许可证数
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
当tryAcquireShared(arg) < 0 获取锁失败,加入AQS等待队列,执行doAcquireSharedInterruptibly(arg)方法。
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//添加一个节点到队列尾,这块的逻辑可以参考AbstractQueuedSynchronizer里的介绍
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();//找当前节点的上一个节点
if (p == head) {//如果上一个节点正好是头节点
int r = tryAcquireShared(arg);//获取共享锁
if (r >= 0) {
setHeadAndPropagate(node, r);//将node节点设置为head节点,r>0说明还有机会获取到锁,唤醒后面的先从,称之为传播
p.next = null; // help GC
failed = false;
return;
}
}
//如果不是头节点,就不能获取
//对节点状态进行检查并更新状态,如果线程应该阻塞,返回true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//中断阻塞,并返回当前线程是否阻塞boolean值
throw new InterruptedException();
}
} finally {
if (failed)
//取消获取
cancelAcquire(node);
}
}
释放锁
release()方法用于锁的释放
/**
* Releases a permit, returning it to the semaphore.
*
* <p>Releases a permit, increasing the number of available permits by
* one. If any threads are trying to acquire a permit, then one is
* selected and given the permit that was just released. That thread
* is (re)enabled for thread scheduling purposes.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*/
public void release() {
sync.releaseShared(1);
}
releaseShared方法,如果锁释放成功,唤醒AQS等待队列中的head节点
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared方法其实是对state做加法运算
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;//加上释放的
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
doReleaseShared方法唤醒后续线程节点可以来争取信号量
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
//获取head的状态
int ws = h.waitStatus;
///头节点线程状态为SIGNAL唤醒后续线程节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒下一个节点
unparkSuccessor(h);
}
//成功设置成 0 之后,将 head 状态设置成传播状态
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
Semaphore非公平策略源码解读
与公平获取的区别是,无需判断队列,其他部分大致相同
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}