Semaphore原理

2018-11-11  本文已影响0人  光_93e5

源码解析

Semaphore(信号量),这个类是用来控制并发时线程的数量的,首先这个类是实现了序列化接口
public class Semaphore implements Serializable(序列化)

我们主要来看看内部的一个结构和主要的方法acquire(获得)和release(释放)方法

其中这个类中有几个内部类,首先是抽象类 Sync 继承了同步器,

 abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        //构造方法,设置了同步状态的值
        Sync(int permits) {
            setState(permits);
        }
        //返回当前的同步状态的值
        final int getPermits() {
            return getState();
        }
        //尝试获取不公平的锁
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                //得到的同步状态的值
                int available = getState();
                //根据得到的值尝试修改同步状态的值
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        //尝试释放锁
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                //获取当前同步状态的值
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                // 根据得到的值尝试修改同步状态的值
                if (compareAndSetState(current, next))
                    return true;
            }
        }
        //减少同步状态值
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
       //返回剩余的同步状态值
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

然后分别有两个静态的内部类 NonfairSync(不公平模式)和FairSync(公平模式)实现了抽象类Sync,可以看到分别重写了同步器中的tryAcquireShared方法,下面是源码

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        //调用了父类的构造函数
        NonfairSync(int permits) {
            super(permits);
        }
       //尝试获取锁,实际调用父类的nonfairTryAcquireShared方法
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
       //调用了父类的构造方法
        FairSync(int permits) {
            super(permits);
        }
        //可以看到和父类的方法中,只是多了一个hasQueuedPredecessors方法来先判断当前的
       // 对列中是否还有任务
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                //判断当前的队列中是否还有任务
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

接下来是构造方法

    //默认创建一个同步状态值为permits的不公平模式
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    //根据boolean判断生成不公平还是公平的模式
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

然后是acquire() 方法

    //设置每个线程每个线程需要占用的同步状态值 默认为1
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    //自定义每个线程每个线程需要占用的同步状态值
    public void acquire(int permits) throws InterruptedException {
        //必须大于0,否则抛出异常
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

然后是release方法

   //释放当前线程所占用的同步状态值,默认为1
    public void release() {
        sync.releaseShared(1);
    }
 //释放当前线程所占用的同步状态值
    public void release(int permits) {
        //必须大于0 否则抛出异常
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

下面是一个Semaphore使用的示例


public class Demo extends Thread {
    private  Semap semap;
    public Demo(Semap semap){
        this.semap=semap;
    }
    @Override
    public void run(){
          semap.Test();
    }
}

public class Semap {
        //设置一个同步值为3的Semaphore
        Semaphore semaphore=new Semaphore(3);
        public void Test(){
            try {
                //设置每个线程要占用的同步值 默认为1
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " 开始: " + LocalTime.now());
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " 结束: " + LocalTime.now());
                //释放所占用的状态值
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
}
//运行
public class SemaphoreDemo{
    public static void main(String[] args) {
        Semap semap=new Semap();
        for(int i=0;i<5;i++){
            new Demo(semap).start();
        }
    }
}
//运行结果
Thread-0 开始: 15:35:38.643
Thread-1 开始: 15:35:38.643
Thread-2 开始: 15:35:38.643
Thread-0 结束: 15:35:39.643
Thread-1 结束: 15:35:39.643
Thread-2 结束: 15:35:39.643
Thread-4 开始: 15:35:39.643
Thread-3 开始: 15:35:39.643
Thread-3 结束: 15:35:40.644
Thread-4 结束: 15:35:40.644

总结

可以看出来其实Semaphore内部其实是根据同步状态的值来限制并发的时线程的数量,当同步状态值为0时,后来的线程将被阻塞,直到有线程释放。

上一篇下一篇

猜你喜欢

热点阅读