并发编程之Semaphore原理与应用

2020-12-15  本文已影响0人  一角钱技术

点赞再看,养成习惯,搜一搜【一角钱技术】关注更多原创技术文章。本文 GitHub org_hejianhui/JavaStudy 已收录,有我的系列文章。

前言

控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间合作,让线程之间相互配合来满足业务逻辑。比如让线程A等待线程B执行完毕后再执行等合作策略。

控制并发流程的工具类主要有:

作用 说明
Semaphore 信号量,可以通过控制“许可证”的数量,来保证线程之间的配合 线程只有拿到“许可证”后才能继续运行,相比于其它的同步器,更灵活
CyclicBarrier 线程会等待,直到足够多线程达到了事先规定的数目。一旦达到触发条件,就可以进行下一步的动作 适用于线程之间相互等待处理结果的就绪场景
Phaser 和CyclicBarrier类似,但是计数可变 Java7加入的
CountDownLatch 和CyclicBarrier类似,数量递减到0时,触发动作 不可重复使用
Exchanger 让两个线程在合适时交换对象 适用场景:当两个线程工作在同一个类的不同实例上时,用于交换数据
Condition 可以控制线程的“等待”和“唤醒” 是Object.wait() 的升级版

简介

Semaphore 信号量,许可,用于控制在一段时间内,可并发访问执行的线程数量。它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。

关于 AQS,可以查看《并发编程之抽象队列同步器AQS应用ReentrantLock》

一个信号量有且仅有 3 种操作,且它们全部是原子的。

Semaphore 管理一系列许可证。

Semaphore 在计数器不为 0 的时候对线程就放行,一旦达到 0,那么所有请求资源的新线程都会被阻塞,包括增加请求到许可的线程,Semaphore 是不可重入的。

Semaphore 有两种模式,公平模式非公平模式 ,默认是非公平模式。

应用场景

Semaphore可以用来做流量限制,特别是公共资源有限的应用场景,比如说数据库连接。

由于 release() 释放许可时,未对释放许可数做限制,所有可以通过该方法增加总的许可数量; reducePermits() 方法可以减少总的许可数量,通过这两个方法可以到达动态调整许可的

分析:假如有一个需求,需读取几个万个文件的数据,因为都是IO密集型,我们可以启动几十个线程并发的读取,但是如果读取到内存后,还需要存储到数据库,而数据库的连接数只有10个,这时候我们就必须要控制只有10个线程同时获取到数据库连接,否则会抛出异常提示无法连接数据库。针对这种情况,我们就可以使用Semaphore来做流量控制。

代码如下

package com.niuh.tools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 * Semaphore示例
 * </p>
 */
public class SemaphoreRunner {
    /**
     * 线程数量
     */
    private static final int THREAD_COUNT = 30;

    /**
     * 线程池
     */
    private static ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);

    private static Semaphore semaphore = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            executor.execute(new Runnable() {
                public void run() {
                    try {
                        // 获取一个"许可证"
                        semaphore.acquire();

                        // 模拟数据保存
                        TimeUnit.SECONDS.sleep(2);
                        System.out.println("save date...");

                        // 执行完后,归还"许可证"
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executor.shutdown();
    }
}

源码分析

Semaphore 类图


其内部主要变量和方法如下:


框架流程图如下:


20201109104031419.png

构造函数

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
/**
* @param permits 总许可数
* @param fair fair=true 公平锁 fair=false 非公平锁
*/
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

内部类同步器

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    // 赋值setState为总许可数
    Sync(int permits) {
        setState(permits);
    }
    // 剩余许可数
    final int getPermits() {
        return getState();
    }
    // 自旋 + CAS 非公平获取
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // 剩余可用许可数
            int available = getState();
            // 本次获取许可后,剩余许可
            int remaining = available - acquires;
            // 如果获取后,剩余许可大于0,则CAS更新剩余许可,否则获取更新失败
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    // 自旋 + CAS 释放许可
    // 由于未对释放许可数做限制,所以可以通过release动态增加许可数量
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // 当前剩余许可
            int current = getState();
            // 许可可更新值
            int next = current + releases;
            // 如果许可更新值为负数,说明许可数量益处,抛出错误
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // CAS更新许可数量
            if (compareAndSetState(current, next))
                return true;
        }
    }
    // 自旋 + CAS 减少许可数量
    final void reducePermits(int reductions) {
        for (;;) {
            // 当前剩余许可
            int current = getState();
            // 更新值
            int next = current - reductions;
            // 如果更新值比当前剩余许可大,抛出益处
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // CAS 更新许可数
            if (compareAndSetState(current, next))
                return;
        }
    }
    // 丢弃所有许可
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

非公平模式

/**
* 非公平模式
*/
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

公平模式

/**
* 公平模式
*/
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }
    // 公平模式获取许可
    // 公平模式不论许可是否充足,都会判断同步队列中是否线程在等待,如果有,获取失败,排队阻塞
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 如果有线程在排队,立即返回
            if (hasQueuedPredecessors())
                return -1;
            // 自旋 + CAS获取许可
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

获取许可

Semaphore 提供了两种获取资源的方式。

响应中断获取资源

两个方法支持 Interrupt 中断机制,可使用 acquire() 方法每次获取一个信号量,也可以使用 acquire(int permits) 方法获取指定数量的信号量 。

从semaphore中获取一个许可,线程会一直被阻塞直到获取一个许可或是被中断,获取一个许可后立即返回,并把许可数减1,如果没有可用的许可,当前线程会处于休眠状态直到:

  1. 某些其他线程调用release方法,并且当前线程是下一个要被分配许可的线程
  2. 某些其他线程中断当前线程

如果当前线程被acquire方法使得中断状态设置为on或者在等待许可时被中断则抛出InterruptedException,并且清除当前线程的已中断状态。

acquire执行流程

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}


public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 获取许可,剩余许可 >= 0,则获取许可成功 <0 获取许可失败,进入排队
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

/**
 * 获取许可失败,当前线程进入同步队列,排队阻塞
 * @param arg the acquire argument
 */
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 创建同步队列节点,并入队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 如果当前节点是第二个节点,尝试获取锁
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

代码的执行步骤如下:


AQS 子类使用共享模式,需要实现 tryAcquireShared() 方法。

  1. 在公平锁中还是与ReentrantLock中的操作一样,先判断同步队列中是不是还有其他的等待线程,有则直接返回失败。否则对 state 值进行减操作并返回剩下的信号量。
  2. 非公平锁直接调用了父类中的 nonfairTryAcquireShared 和 ReentrantLock 一样。
// 非公平锁的获取方式
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}


final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();//获取去中的信号量数
        int remaining = available - acquires;//剩余信号量数
        //1.信号量数大于0,获取共享锁,并设置执行compareAndSetState(available, remaining),返回剩余信号量数
        //2.信号量数小于等于0,直接返回负数
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}

// 公平锁获取
protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1; 
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}

变量 state 采用 volatile 可见修饰。

/**
  * The synchronization state.
*/
private volatile int state;
 
/**
 * Returns the current value of synchronization state.
 * This operation has memory semantics of a <tt>volatile</tt> read.
 * @return current state value
*/
protected final int getState() {
    return state;
}

不响应中断获取资源

两个方法不响应 Interrupt 中断机制,其它功能与 acquire() 方法一致。

从semaphore中获取一个许可,线程会一直被阻塞直到获取一个许可或是被中断,获取一个许可后立即返回,并把许可数减1,如果没有可用的许可,当前线程会处于休眠状态直到:

  1. 某些其他线程调用release方法,并且当前线程是下一个要被分配许可的线程;
  2. 如果当前线程在等待许可时被中断,那么它会接着等待,但是与没有发生中断相比,为线程分配许可的时间可能改变。
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

尝试获得信号量

尝试获得信号量有三个方法。

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

释放归还许可

release 方法,主要作用是释放资源,需要保证 release 的执行,否则线程退出但是资源没有释放。

//  尝试释放锁
public final boolean release(int arg) {
    // 如果释放锁成功 唤醒同步队列中的后继节点
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// 为了方便对比把两个代码放在一块 可以看到 release 中的结构完全一样
// 区别就在于 doReleaseShared 中有更多的判断操作
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();  //在里面执行的 unparkSuccessor(h)
        return true;
    }
    return false;
}

子类实现共享模式的类需要实现 tryReleaseShared() 方法判断是否释放成功。

// 由于未对释放许可数做限制,所以可以通过release动态增加许可数量
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        //获取当前许可数量
        int current = getState();
        //计算回收后的数量
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //CAS改变许可数量成功,返回true
        if (compareAndSetState(current, next))
            return true;
    }
}

一旦 CAS 改变许可数量成功,就调用 doReleaseShared() 方法释放阻塞的线程。

private void doReleaseShared() {
    // 自旋,唤醒等待的第一个线程(其它线程将由第一个线程向后传递唤醒)
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒第一个等待线程
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

其他方法

获取当前剩余的信号量数量

public int availablePermits() {
    return sync.getPermits();
}

// Sync
final int getPermits() {
    return getState();
}

耗尽许可数量

public int drainPermits() {
    return sync.drainPermits();
}

// Sync
final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

缩减许可数量

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

// Sync
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}

上述两个方法对共享资源数量的修改操作有两点需要注意

判断 AQS 同步队列中是否还有 Node

public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}

// AbstractQueuedSynchronizer
public final boolean hasQueuedThreads() {
   //头结点不等于尾节点就说明链表中还有元素
   return head != tail;
}

总结

PS:以上代码提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

文章持续更新,可以搜一搜「 一角钱技术 」第一时间阅读, 本文 GitHub org_hejianhui/JavaStudy 已经收录,欢迎 Star。

上一篇下一篇

猜你喜欢

热点阅读