JDK源码

java.util.concurrent.Semaphore源码

2018-03-06  本文已影响5人  sunpy

控制并发线程数Semaphore

生活中,我们过桥,如果桥就能过3个人,那么一次就只能走三个人,如果多了,那么就会有人掉河里了。这就是Semaphore控制人数过桥。(而此处就是Semaphore控制只能有特定数量的线程访问指定资源)。

继承与实现关系

public class Semaphore implements java.io.Serializable

Semaphore中的自定义的同步器

    /**
     * 
     * 自定义同步器继承AQS,使用AQS的状态state来控制同时访问的线程数(流量)
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        //创建指定线程数访问的同步器构造器
        Sync(int permits) {
            setState(permits);
        }
        //获取允许线程同时访问的数量
        final int getPermits() {
            return getState();
        }
        
        //采用非公平的方式尝试获取共享状态下的同步状态值
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                //获取当前同步状态值
                int available = getState();
                //计算剩余的同步状态值
                int remaining = available - acquires;
                /**
                 * 如果剩余的同步状态值小于0或者当前的同步状态值为available
                 * 将当前同步状态值更新为remaining
                 */
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    //返回当前最新的同步状态值
                    return remaining;
            }
        }
        
        //采用共享方式释放同步状态值
        protected final boolean tryReleaseShared(int releases) {
            //死循环
            for (;;) {
                //获取当前的同步状态值
                int current = getState();
                //计算如果释放同步状态值之后,得到的结果next
                int next = current + releases;

                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //如果当前同步状态值为current,那么更新当前的同步状态值为next
                if (compareAndSetState(current, next))
                    //返回true
                    return true;
            }
        }

        //减少并发线程的数量
        final void reducePermits(int reductions) {
            //死循环
            for (;;) {
                //获取当前的同步状态值
                int current = getState();
                //计算出剩下的并发线程数
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                //如果当前同步状态值为current,那么更新当前的同步状态值为next
                if (compareAndSetState(current, next))
                    return;
            }
        }
        
        //将并发的线程数量调整为0
        final int drainPermits() {
            //死循环
            for (;;) {
                //获取当前的同步状态值
                int current = getState();
                /**
                 * 如果当前的同步状态值为0或者当前同步状态值等于current
                 * 那么将当前的同步状态值current更新为0
                 */
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

非公平同步器

    /**
     * 非公平同步器
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        //非公平同步器构造器
        NonfairSync(int permits) {
            super(permits);
        }
        //在共享模式下尝试获取同步状态值
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

公平同步器

    /**
     * 公平同步器
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
        //公平同步器构造器
        FairSync(int permits) {
            super(permits);
        }
        //在共享模式下尝试获取同步状态值
        protected int tryAcquireShared(int acquires) {
            //死循环
            for (;;) {
                //当前队列是否有前驱
                if (hasQueuedPredecessors())
                    return -1;
                //获取当前的同步状态值
                int available = getState();
                //计算出剩下的并发线程数
                int remaining = available - acquires;
                /**
                 * 如果剩下的并发线程数小于0或者当前同步状态值等于available
                 * 那么将当前的同步状态值更新为remaining
                 */
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    //返回计算出的最新同步状态值
                    return remaining;
            }
        }
    }

构造器

    /**
     * 创建一个指定并发线程数的非公平同步器构造器
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * 创建一个指定并发线程数、是否公平的同步器构造器
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

方法

    /**
     * 创建一个指定并发线程数的非公平同步器构造器
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * 创建一个指定并发线程数、是否公平的同步器构造器
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    /**
     * 在信号量Semaphore中获取一个许可
     * 在获取一个许可前线程将会阻塞,否则线程被中断
     * 整体许可数将减少1
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 
     * 在信号量Semaphore中获取一个许可
     * 在获取一个许可前线程将会阻塞,虽然等待时被中断,但是仍然将继续等待
     * 整体许可数将减少1
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    /**
     * 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
     */
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /**
     * 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可
     */
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * 释放一个许可,将其返回给信号量
     */
    public void release() {
        sync.releaseShared(1);
    }

    /**
     * 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断
     */
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /**
     * 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞
     */
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /**
     * 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可
     */
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    /**
     * 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
     */
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    /**
     * 释放给定数目的许可,将其返回到信号量
     */
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    /**
     * 返回此信号量中当前可用的许可数
     */
    public int availablePermits() {
        return sync.getPermits();
    }

    /**
     * 获取并返回立即可用的所有许可
     */
    public int drainPermits() {
        return sync.drainPermits();
    }

    /**
     * 根据指定的缩减量减小可用许可的数目
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    /**
     * 如果此信号量的公平设置为 true,则返回 true
     */
    public boolean isFair() {
        return sync instanceof FairSync;
    }

    /**
     * 查询是否有线程正在等待获取
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * 返回正在等待获取的线程的估计数目
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    /**
     * 返回一个 collection,包含可能等待获取的线程
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

应用例子

public class SemaphoreTest {  
    private static final int PERSON_NUM=4;  
    private static ExecutorService es=Executors.newFixedThreadPool(PERSON_NUM);  
    private static Semaphore s=new Semaphore(3,true);  
    public static void release(Semaphore s,String name){  
        s.release();  
        System.out.println(name+"已经离开桥了!");  
    }  
    public static void main(String[] args) {  
        es.execute(new Runnable(){  
            @Override  
            public void run() {  
                try {  
                    s.acquire();  
                    System.out.println("甲上桥了!");  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }      
        });  
        es.execute(new Runnable(){  
            @Override  
            public void run() {  
                try {  
                    s.acquire();  
                    System.out.println("乙上桥了!");  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }      
        });  
        es.execute(new Runnable(){  
            @Override  
            public void run() {  
                try {  
                    s.acquire();  
                    System.out.println("丙上桥了!");  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
        es.execute(new Runnable(){  
            @Override  
            public void run() {  
                try {  
                    release(s,"甲");  
                    s.acquire();  
                    System.out.println("丁上桥了!");  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
    }  
}  

结果:
甲上桥了!
乙上桥了!
丙上桥了!
甲已经离开桥了!
丁上桥了!

解释:虽然线程池里面有甲乙丙丁四个线程准备过桥Semaphore,但是Semaphore只能让三个人过桥,所以,甲没离开桥上时,丁是无法上桥的,所以甲离开之后,丁就可以上桥了。


---------------------------该源码为jdk1.7版本的

上一篇 下一篇

猜你喜欢

热点阅读