java并发之Semaphore

2020-09-05  本文已影响0人  立志java

java并发之Semaphore

知识导读

用例

Semaphore 称为计数信号量,它允许n个任务同时访问某个资源。Semaphore持有一定数量的执行许可证。

acquire方法用于获取许可证,release方法用于释放许可证

public class SemaphoreTest {
  public static void main(String[] args) {
    ExecutorService service = Executors.newCachedThreadPool();
    final  Semaphore sp = new Semaphore(3);
    for(int i=0;i<10;i++){
      Runnable runnable = new Runnable(){
        public void run(){
          try {
            sp.acquire();
          } catch (InterruptedException e1) {
            e1.printStackTrace();
          }
          System.out.println("线程" + Thread.currentThread().getName() + 
                             "进入,当前已有" + (3-sp.availablePermits()) + "个并发");
          try {
            Thread.sleep((long)(Math.random()*10000));
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println("线程" + Thread.currentThread().getName() + 
                             "即将离开");                   
          sp.release();
          //下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
          System.out.println("线程" + Thread.currentThread().getName() + 
                             "已离开,当前已有" + (3-sp.availablePermits()) + "个并发");           
        }
      };
      service.execute(runnable);            
    }
  }
}

源码解析

Sempaphore的构造方法,创建了内部类Sync的实现,提供了公平模式和非公平模式两种。

//非公平模式
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
//公平模式
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Sempaphore中的内部类Sync实现了AQS的共享锁模式,通过控制state来控制获取同步状态,当state>0的时候可以获取同步状态。所以初始化的时候指定了state的初始值。

abstract static class Sync extends AbstractQueuedSynchronizer {
   //将state设置为 许可证的最大数量
    Sync(int permits) {
        setState(permits);
    }

    final int getPermits() {
        return getState();
    }
}

公平模式

FairSync提供了公平模式的实现,覆写AQS的tryAcquireShared方法。

  1. 先调用hasQueuedPredecessors判断AQS同步队列是否有排在当前线程之前的等待线程,如果有,直接返回复数表示获取同步状态失败,当前线程加入同步队列并阻塞
  2. 如果当前线程是排名最靠前的,则CAS设置state减去申请的值
static final class FairSync extends Sync {
    FairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        //判断state是否减到0,如果减到了返回负数会阻塞,否则返回正数,获的许可证
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

非公平模式

NonfairSync提供了非公平模式的实现,覆写AQS的tryAcquireShared方法。非公平模式比较简单,直接修改state值

  1. 判断state是否大于需申请的许可证数量
  2. 如果满足,CAS设置state值,将值修改为减去申请数量后的值
static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

for循环+CAS保证并发安全

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

释放许用于可证

Semaphore中release方法用于释放许可证,直接调用内部类Sync释放许可证

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

Sync继承了AQS,覆写了tryReleaseShared方法。由于是共享模式,所以在释放的时候会有多线程并发问题。这里使用for循环加CAS将state值加回去

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}
上一篇 下一篇

猜你喜欢

热点阅读