Java 并发程序员

【Java 并发笔记】Semaphore 相关整理

2019-01-21  本文已影响9人  58bc06151329

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

1. 简介

1.1 Semaphore 的应用场景

public class SemaphoreDemo {

    private static Semaphore s = new Semaphore(2);

    static class ParkTask implements Runnable {
        private String name;

        public ParkTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                s.acquire();
                System.out.println("Thread " + this.name + " start...");
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }

        public static void main(String[] args) {
            ExecutorService pool = Executors.newCachedThreadPool();
            pool.submit(new ParkTask("1"));
            pool.submit(new ParkTask("2"));
            pool.submit(new ParkTask("3"));
            pool.submit(new ParkTask("4"));
            pool.submit(new ParkTask("5"));
            pool.submit(new ParkTask("6"));
            pool.shutdown();
        }
}

/**
--- print ---
Thread 2 start...
Thread 6 start...
Thread 3 start...
Thread 1 start...
Thread 4 start...
Thread 5 start...
*/

2. Semaphore 原理

Semaphore 类图
// 创建具有给定的许可数和非公平的公平设置的 Semaphore。
Semaphore(int permits)
// 创建具有给定的许可数和给定的公平设置的 Semaphore。
Semaphore(int permits, boolean fair)

// 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void acquire()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
void acquire(int permits)
// 从此信号量中获取许可,在有可用的许可前将其阻塞。
void acquireUninterruptibly()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信号量中当前可用的许可数。
int availablePermits()
// 获取并返回立即可用的所有许可。
int drainPermits()
// 返回一个 collection,包含可能等待获取的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待获取的线程的估计数目。
int getQueueLength()
// 查询是否有线程正在等待获取。
boolean hasQueuedThreads()
// 如果此信号量的公平设置为 true,则返回 true。
boolean isFair()
// 根据指定的缩减量减小可用许可的数目。
protected void reducePermits(int reduction)
// 释放一个许可,将其返回给信号量。
void release()
// 释放给定数目的许可,将其返回到信号量。
void release(int permits)
// 返回标识此信号量的字符串,以及信号量的状态。
String toString()
// 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
boolean tryAcquire()
// 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
boolean tryAcquire(int permits)
// 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
boolean tryAcquire(long timeout, TimeUnit unit)

构造函数

//permits是允许同时运行的线程数目
public Semaphore(int permits) {
        sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
NonfairSync(int permits) {
      super(permits);
}
......
Sync(int permits) {
      setState(permits);
}

响应中断获取资源

//从semaphore中获取一个许可,线程会一直被阻塞直到获取一个许可或是被中断,获取一个许可后立即返回,并把许可数减1,如果没有可用的许可,当前线程会处于休眠状态直到
//1.某些其他线程调用release方法,并且当前线程是下一个要被分配许可的线程
//2.某些其他线程中断当前线程
//如果当前线程被acquire方法使得中断状态设置为on或者在等待许可时被中断则抛出InterruptedException,并且清除当前线程的已中断状态
public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
// 非公平锁的获取方式
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}


final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();//获取去中的信号量数
        int remaining = available - acquires;//剩余信号量数
        //1.信号量数大于0,获取共享锁,并设置执行compareAndSetState(available, remaining),返回剩余信号量数
        //2.信号量数小于等于0,直接返回负数
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}

// 公平锁获取
protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1; 
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}
/**
  * The synchronization state.
*/
private volatile int state;
 
/**
 * Returns the current value of synchronization state.
 * This operation has memory semantics of a <tt>volatile</tt> read.
 * @return current state value
*/
protected final int getState() {
    return state;
}

不响应中断获取资源

//从semaphore中获取一个许可,线程会一直被阻塞直到获取一个许可或是被中断,获取一个许可后立即返回,并把许可数减1,如果没有可用的许可,当前线程会处于休眠状态直到
//1.某些其他线程调用release方法,并且当前线程是下一个要被分配许可的线程
//2.如果当前线程在等待许可时被中断,那么它会接着等待,但是与没有发生中断相比,为线程分配许可的时间可能改变 
ublic void acquireUninterruptibly() {
    sync.acquireShared(1);
}

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}!

尝试获得信号量

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

释放资源

//  尝试释放锁
public final boolean release(int arg) {
    // 如果释放锁成功 唤醒同步队列中的后继节点
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// 为了方便对比把两个代码放在一块 可以看到 release 中的结构完全一样
// 区别就在于 doReleaseShared 中有更多的判断操作
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();  //在里面执行的 unparkSuccessor(h)
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        //获取当前许可数量
        int current = getState();
        //计算回收后的数量
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //CAS改变许可数量成功,返回true
        if (compareAndSetState(current, next))
            return true;
    }
}

其他方法

获取当前剩余的信号量数量

public int availablePermits() {
    return sync.getPermits();
}

// Sync
final int getPermits() {
    return getState();
}

耗尽许可数量

public int drainPermits() {
    return sync.drainPermits();
}

// Sync
final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

缩减许可数量

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

// Sync
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}

判断 AQS 同步队列中是否还有 Node

public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}

// AbstractQueuedSynchronizer
public final boolean hasQueuedThreads() {
   //头结点不等于尾节点就说明链表中还有元素
   return head != tail;
}

3. 总结

参考资料

https://www.meiwen.com.cn/subject/expkcxtx.html
https://blog.csdn.net/u014634338/article/details/78701445

上一篇 下一篇

猜你喜欢

热点阅读