JAVA-Lock解析-七-Semaphore解析

2019-10-15  本文已影响0人  AlanSun2

信号量 Semaphore 是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么 acquire 方法将一直阻塞,直到其它线程释放许可。

Semaphore 通常用于线程对统一资源的访问控制。需要注意的是它没有实现 Lock 接口,也不支持 Condition。官方示例:

 class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
   }

   // Not a particularly efficient data structure; just for demo

   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }
 }

在获取项目之前,每个线程必须从信号灯获取许可,以确保可以使用该项目。线程完成该项目后,将其返回到池中,并向信号量返回一个许可,从而允许另一个线程获取该项目。

Semaphore 内部使用了共享锁,没有独占锁和 ReentrantLock 正好相反,还有一个 ReadLock in ReentrantReadWriteLock 的区别是 Semaphore 可以在构造方法中指定共享锁的数量,ReadLock 只能是默认的 65535,而且大于 65535 还会抛出异常。Semaphore 在大于指定的共享锁数量时会阻塞。

同样 Semaphore 有三个静态内部类 Sync,NonFairSync,FairSync。

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }
        
        //调用过程:
        //Semaphore#acquire-> NonFairSync#tryAcquireShared -> 本方法
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {//采用自旋直到 没有空闲的共享锁或 CAS 修改 state 的值成功
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        
        //调用过程:
        //Semaphore#release -> AQS#releaseShared -> 本方法
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {//自旋直到 CAS 成功
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow,表示 releases 不能是一个负值
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
        
        //减少许可证的数量
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
        
        //把当前的可用的许可证清空
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    /**
     * 非公平锁实现
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * 公平锁实现
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }
        
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
上一篇下一篇

猜你喜欢

热点阅读