public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
public void run() {
try {
System.out.println("save data");
} catch (InterruptedException e) {
虽然有30个线程在执行,但是只允许10个并发的执行。Semaphore的构造方法Semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()获取一个许可证,使用完之后调用release()归还许可证。还可以用tryAcquire()方法尝试获取许可证。
int availablePermits() :返回此信号量中当前可用的许可证数。
int getQueueLength():返回正在等待获取许可证的线程数。
boolean hasQueuedThreads() :是否有线程正在等待获取许可证。
void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。
Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。
- 接受一个许可数量的构造,默认是非公平
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
public Semaphore(int permits) {
sync = new NonfairSync(permits);
- 两个参数,其中一个是“许可证”数量,true代表公平策略
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
FairSync(int permits) {
Sync(int permits) {
* Acquires a permit from this semaphore, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for a permit,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
* @throws InterruptedException if the current thread is interrupted
public void acquire() throws InterruptedException {
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())//先判断队列中是否存在比当前线程等待时间长的线程
return -1;
int available = getState();//获取可用的许可证数
int remaining = available - acquires;//申请acquires数量后,剩余的可用许可证数
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
当tryAcquireShared(arg) < 0 获取锁失败,加入AQS等待队列,执行doAcquireSharedInterruptibly(arg)方法。
* Acquires in shared interruptible mode.
* @param arg the acquire argument
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();//找当前节点的上一个节点
if (p == head) {//如果上一个节点正好是头节点
int r = tryAcquireShared(arg);//获取共享锁
if (r >= 0) {
setHeadAndPropagate(node, r);//将node节点设置为head节点,r>0说明还有机会获取到锁,唤醒后面的先从,称之为传播 = null; // help GC
failed = false;
if (shouldParkAfterFailedAcquire(p, node) &&
throw new InterruptedException();
} finally {
if (failed)
* Releases a permit, returning it to the semaphore.
* <p>Releases a permit, increasing the number of available permits by
* one. If any threads are trying to acquire a permit, then one is
* selected and given the permit that was just released. That thread
* is (re)enabled for thread scheduling purposes.
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
public void release() {
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
return true;
return false;
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;//加上释放的
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
private void doReleaseShared() {
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//成功设置成 0 之后,将 head 状态设置成传播状态
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
if (h == head) // loop if head changed
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;