Semaphore

2020-06-26  本文已影响0人  奔向学霸的路上

Semaphore概念

semaphore,又名信号量,类似于“许可证”的概念,它实际上是维护了一些“许可证”,用来控制同时允许共享资源的最大线程数。比如疫情期间的图书馆,每天只允许一定数量的人进入,其他人来了之后就需要等待。

应用场景

Semaphore用作流量控制,特别是资源有限的情况下。常用来举例的一个场景是说,数据库的连接,比如我们要读取的数据量比较大,启动几十个线程并发读取,但是由于数据库连接数只有10个,所以这个时候需要Semaphore来控制最多10个线程来请求数据库。

代码示例

public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = Executors
            .newFixedThreadPool(THREAD_COUNT);

    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        s.acquire();
                        System.out.println("save data");
                        s.release();
                    } catch (InterruptedException e) {
                    }
                }
            });
        }

        threadPool.shutdown();
    }
}

虽然有30个线程在执行,但是只允许10个并发的执行。Semaphore的构造方法Semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()获取一个许可证,使用完之后调用release()归还许可证。还可以用tryAcquire()方法尝试获取许可证。

Semaphore还提供一些其他方法

int availablePermits() :返回此信号量中当前可用的许可证数。
int getQueueLength():返回正在等待获取许可证的线程数。
boolean hasQueuedThreads() :是否有线程正在等待获取许可证。
void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。
Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。

Semaphore原理

Semaphore底层也是基于AQS分别实现了公平与非公平策略,需注意semaphore的锁是共享锁。

Semaphore源码分析

semaphore的两种构造函数

获取锁

  1. 接受一个许可数量的构造,默认是非公平
/**
     * Creates a {@code Semaphore} with the given number of
     * permits and nonfair fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
  1. 两个参数,其中一个是“许可证”数量,true代表公平策略
/**
     * Creates a {@code Semaphore} with the given number of
     * permits and the given fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     * @param fair {@code true} if this semaphore will guarantee
     *        first-in first-out granting of permits under contention,
     *        else {@code false}
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

构造一个公平策略Semaphore,state设置为允许的最大许可量

        //FairSync类
        FairSync(int permits) {
            super(permits);
        }
        //Sync类(FairSync类的父类)
        Sync(int permits) {
            setState(permits);
        }
Semaphore公平策略源码解读

当线程调用acquire()方法时,入参"1"表示尝试获取1个许可

/**
     * Acquires a permit from this semaphore, blocking until one is
     * available, or the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>Acquires a permit, if one is available and returns immediately,
     * reducing the number of available permits by one.
     *
     * <p>If no permit is available then the current thread becomes
     * disabled for thread scheduling purposes and lies dormant until
     * one of two things happens:
     * <ul>
     * <li>Some other thread invokes the {@link #release} method for this
     * semaphore and the current thread is next to be assigned a permit; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * for a permit,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

acquireSharedInterruptibly,线程中断,抛中断异常;尝试获取锁小于0,代表获取失败,加入等待队列

/**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //获取失败,加入等待队列
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared尝试获取锁,先判断队列中是否存在比当前线程等待时间长的线程;available获取可用的许可证数;remaining申请acquires数量后,剩余的可用许可证数;

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())//先判断队列中是否存在比当前线程等待时间长的线程
                    return -1;
                int available = getState();//获取可用的许可证数
                int remaining = available - acquires;//申请acquires数量后,剩余的可用许可证数
                //两种情况,1,是可用许可证没有了,那么返回剩余许可证数量;2,是许可证还有,CAS尝试更新成功后,返回剩余许可证数
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

当tryAcquireShared(arg) < 0 获取锁失败,加入AQS等待队列,执行doAcquireSharedInterruptibly(arg)方法。

/**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //添加一个节点到队列尾,这块的逻辑可以参考AbstractQueuedSynchronizer里的介绍
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();//找当前节点的上一个节点
                if (p == head) {//如果上一个节点正好是头节点
                    int r = tryAcquireShared(arg);//获取共享锁
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);//将node节点设置为head节点,r>0说明还有机会获取到锁,唤醒后面的先从,称之为传播
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //如果不是头节点,就不能获取
                //对节点状态进行检查并更新状态,如果线程应该阻塞,返回true
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//中断阻塞,并返回当前线程是否阻塞boolean值
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                //取消获取
                cancelAcquire(node);
        }
    }

释放锁
release()方法用于锁的释放

/**
     * Releases a permit, returning it to the semaphore.
     *
     * <p>Releases a permit, increasing the number of available permits by
     * one.  If any threads are trying to acquire a permit, then one is
     * selected and given the permit that was just released.  That thread
     * is (re)enabled for thread scheduling purposes.
     *
     * <p>There is no requirement that a thread that releases a permit must
     * have acquired that permit by calling {@link #acquire}.
     * Correct usage of a semaphore is established by programming convention
     * in the application.
     */
    public void release() {
        sync.releaseShared(1);
    }

releaseShared方法,如果锁释放成功,唤醒AQS等待队列中的head节点

/**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared方法其实是对state做加法运算

    protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;//加上释放的
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
    }

doReleaseShared方法唤醒后续线程节点可以来争取信号量

  /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                //获取head的状态
                int ws = h.waitStatus;
                ///头节点线程状态为SIGNAL唤醒后续线程节点
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //唤醒下一个节点
                    unparkSuccessor(h);
                }
                //成功设置成 0 之后,将 head 状态设置成传播状态
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
Semaphore非公平策略源码解读

与公平获取的区别是,无需判断队列,其他部分大致相同

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
上一篇下一篇

猜你喜欢

热点阅读