JUC并发相关

18. 并发终结之Semaphore

2020-09-30  本文已影响0人  涣涣虚心0215

Semaphore(信号量)上定义两种操作: acquire(获取) 和 release(释放)。
当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。
release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制,比如流量控制。

源码分析

Semaphore的代码和CountDownLatch的代码结构类似,都是有一个private final Sync sync成员变量,即底层也是通过AQS来进行控制。

  1. AQS部分
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    //跟CountDownLatch类似,用state的个数来控制共享锁数量,permits是许可证意思
    Sync(int permits) {
        setState(permits);
    }
    final int getPermits() {
        return getState();
    }
    //非公平方式获取共享锁,不需要管sync queue,而是直接CAS设置state
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            //如果remaining<0表示已经没有可用的许可证了,
            //     avaliable的个数为0,就return。
            //另外还有个或,CAS设置state到remaining的个数,设置成功则返回
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    //释放共享锁
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}
/**
 * NonFair version
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    NonfairSync(int permits) {
        super(permits);
    }
        //调用匪类nonfairTryAcquireShared来非公平获取共享锁
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}
/**
 * Fair version
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    FairSync(int permits) {
        super(permits);
    }
    //公平方式获取共享锁,即需要check当前节点是否有前继节点在sync queue里
    protected int tryAcquireShared(int acquires) {
        //是无限循环
        for (;;) {  
             //如果有前继节点,则返回-1,尝试获取锁失败,加入sync queue
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            //如果remaining<0表示已经没有可用的许可证了,
            //       avaliable的个数为0,就return
            //另外还有个或,CAS设置state到remaining的个数,设置成功则返回
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
  1. acquire+AQS
    acquire与CountDownLatch的await()方法一模一样,只是tryAcquireShared的实现不一样。Semaphore的实现在上面,注意Fair和NonFair的实现都是无限for循环的,且退出for循环的条件有两个:没有可用的许可证,或者有许可证,CAS设置成功。
    如果CAS更新不成功,表示别的线程获取到资源,当前线程则继续for循环直到没有可用的许可证或者CAS更新成功。
    下面还要重新看下doAcquireSharedInterruptibly方法,在多个acquire的线程release的情况下,PROPAGATE是如何操作的。
    仔细看tryAcquireShared()到setHeadAndPropagate(node, r);这个步骤先去看有没有许可证,然后开始setHead,那么这里就有可能出现队列中处于等待状态的节点因为第一个线程完成释放唤醒,第二个线程获取到锁并发现没有多余许可证(propagate=0),但还没设置好head,又有新线程释放锁,但是读到老的head状态为0导致释放但不唤醒,最终后一个等待线程既没有被释放线程唤醒,也没有被持锁线程唤醒的可能。
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
//支持中断的共享锁获取方法
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //tryAcquireShared<0则表示没有可用的许可证了,则当前线程需要进入sync queue
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}   
//在Semaphore重新看一下这个方法
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                 //如果前继节点是head,就看看有没有许可证
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //如果有许可证,就将当前节点设置为head,并且PROPAGATE传播给后继节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    //propagate>0就表示有多个许可证可以获得,即就可以唤醒多个sync queue里的线程
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
  1. release+AQS
    release与CountDownLatch的countDown()方法代码一致,只是tryReleaseShared的实现不同。CountDownLatch是对state进行减操作,Semaphore是对state进行加操作。
public void release() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
遗留问题PROPAGATE的作用

即如果没有PROPAGATE状态,而是单纯的用propagate参数来控制唤醒下一个节点。
从下面的Test可以看出,Semaphore的许可证有两个,但是我们有四个线程来进行acquire。那么很正常的结果:

  1. Thread1和Thread2能正常acquire,并且执行业务逻辑;
  2. Thread3和Thread4则因为没有许可证而进入等待,且sync queue里面有三个节点:head(status:-1),Node1(status:-1, Thread:Thread3)和Node2(status:0, Thread:Thread4)。
  3. 然后Thread1和Thread2分别等待20s之后开始release,这里极端一点,同时release。
  4. 那么Thread1的release会将head从SIGNAL设置为0,并且唤醒Node1,即Thread3线程操作,Thread3会进行tryAcquireShared()查看许可证的个数,这时为0(剩余未0),则propagate=0,这时候Thread3还没开始setHeadAndPropagate()。
  5. 这时候Thread2的release也开始了,它发现head其实已经不是SIGNAL了(与4操作里的head是同一个,Thread3还没setHead),就不会唤醒后继节点,那么这里是不会唤醒Node2,即Thread4的线程(记住假设这里不做PROPAGATE的那部分操作)。那么回到4步骤,Thread3被唤醒之后开始setHeadAndPropagate(),这个时候步骤4里拿到的许可证为0,propagate=0,则不会唤醒后继节点了,那么Thread4则不会被唤醒了,一直挂起着。
  6. 现在有了PROPAGATE状态,Thread2release的时候发现head的状态为0,则将head的状态更新为PROPAGATE,那么Thread3在setHeadAndPropagate()里面尽管propagate参数是0表示没有许可证,但是head的Status是PROPAGATE(-3)<0的,那么也会唤醒它的后继节点,从而保证release不会出问题。
public class Test {
    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(2);
        new Thread(()->{
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"===get resource");
                Thread.sleep(20000);
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread1").start();

        new Thread(()->{
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"===get resource");
                Thread.sleep(20000);
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread2").start();

        new Thread(()->{
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"===get resource");
                Thread.sleep(20000);
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread3").start();

        new Thread(()->{
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"===get resource");
                Thread.sleep(20000);
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread4").start();
    }
}
上一篇下一篇

猜你喜欢

热点阅读