13. 同步工具类的使用
[TOC]
同步工具类,通常利用他们的特性来构建并发安全的类.
常用的同步工具类有:
- 信号量
- 闭锁
- 栅栏
- FutureTask
信号量(Semaphore)
Java的信号量实际上就是基于操作系统的信号量来实现的.
信号量的原理可以参考《互斥的底层实现》关于信号量的描述。简而言之,信号量是用来控制同时访问某个特定资源的操作数量,或者同时执行某个操作的数量。如果数量固定为1,也就是互斥了。
计数信号量相当于若干数量的许可证,持有许可证的线程或者操作可以执行某个方法或者获取某个资源(也是执行操作),当许可证未发放完时,前来的线程都可以领到,当许可证发放完,再来的线程只能等之前的线程执行完成返还了许可证,这个期间它们只能阻塞等待(在Java信号量的实现是阻塞等待)。
计数信号量可以实现某种资源池,分配一个资源计数值减一,返回一个资源计数值加一;或者可以实现对容器施加边界(添加一个元素消耗一个计数值,删除一个元素返回一个计数值)。
Java中的信号量是java.util.concurrent.Semaphore类。
- 使用信号量给容器加上边界:
public class BoundArrayList<T>{
// ArrayList实际上如果不加限制,在加入新的元素时会自动扩容
private final ArrayList<T> list;
private final Semaphore sem;
public BoundArrayList(int bound){
this.list = = new ArrayList<>(bound);
sem = new Semaphore(bound);
}
public void add(T e) throws InterruptedException{
sem.acquire();
boolean isAdded = true;
try{
isAdded= this.list.add(e);
}finally{
// 如果add失败,就释放添加许可
if(!isAdded)
sem.release();
}
}
public void remove(T e){
boolean isRemoved = true;
try{
isRemoved = this.list.remove(e);
}finally{
if(isRemoved)
sem.release();
}
}
}
- 使用锁来实现信号量
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 使用锁来实现信号量Semaphore
*
*/
public class SemaphoreWithLock {
private final Lock lock = new ReentrantLock();
private final Condition permitAvaliable = lock.newCondition();
private volatile int permitCount;
public SemaphoreWithLock(int count) {
this.permitCount = count;
}
public void acquire() throws InterruptedException {
lock.lock();
try {
if (permitCount > 0) {
permitCount--;
} else {
while (permitCount <= 0)
permitAvaliable.await();
}
} finally {
lock.unlock();
}
}
public void release() {
lock.lock();
try {
permitCount++;
permitAvaliable.signalAll();
} finally {
lock.unlock();
}
}
}
闭锁(CountDownLatch)
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
A
CountDownLatch
is initialized with a given count. Theawait
methods block until the current count reaches zero due to invocations of thecountDown()
method, after which all waiting threads are released and any subsequent invocations ofawait
return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using aCyclicBarrier
.
当闭锁减为0之后,在闭锁上阻塞的线程全部被唤醒,此时一个闭锁就失效了,不可循环使用。如果需要再次使用,可以使用CyclicBarrier
.
- CountLatch sample:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
/**
* CountDownLatch是一个倒数闭锁,在此latch上await的线程,会在计数值降为0的时候被唤醒,其他时候阻塞。
*/
public class CountDownLatchTest {
/**
* 模拟join
*
* @throws InterruptedException
*/
@Test
public void simulateJoin() throws InterruptedException {
CountDownLatch end = new CountDownLatch(1);
new Thread(() -> {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " start!");
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(threadName + " finish!");
end.countDown();
}).start();
end.await();
System.out.println("main end");
}
/**
* 子线程等待main给出开始信号,main线程等待所有的子线程的结束。
* @throws InterruptedException
*
*/
@Test
public void startWithSingal() throws InterruptedException {
CountDownLatch start = new CountDownLatch(1);
int threadCOunt = 100;
CountDownLatch end = new CountDownLatch(threadCOunt);
for (int i = 0; i < threadCOunt; i++) {
new Thread(() -> {
try {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " wait start!");
start.await();
System.out.println(threadName + " start.....end!");
end.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
System.out.println("Main start count down");
start.countDown();
end.await();
System.out.println("all end");
}
}
- 使用锁来实现闭锁
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class CountDownLatchOnLock {
private int count;
private final Lock lock;
private final Condition down2zero;
public CountDownLatchOnLock(int count) {
this.count = count;
this.lock = new ReentrantLock();
down2zero = lock.newCondition();
}
public void countDown() {
lock.lock();
try {
if (--count <= 0) {
down2zero.signalAll();
}
} finally {
lock.unlock();
}
}
public void await() throws InterruptedException {
lock.lock();
try {
down2zero.await();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
CountDownLatchOnLock countdown = new CountDownLatchOnLock(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " start");
countdown.countDown();
}).start();
}
countdown.await();
System.out.println("all end");
}
}
栅栏(Barrier)或者屏障
栅栏类似闭锁,它能阻塞一组线程直到某个事件发生。
栅栏与闭锁的关键区别在于,所有线程必须到达栅栏的位置,才能继续执行。闭锁用于等待事件(计数值降为0的事件),而栅栏用于等待其他线程。
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.
A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.
- Test
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
@Test
public void testCyclicBarrier() throws InterruptedException {
int count = 5;
CyclicBarrier barrier = new CyclicBarrier(count);
CountDownLatch countDownLatch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
Runnable r = () -> {
try {
String name = Thread.currentThread().getName();
System.out.println(name + " start wait");
barrier.await();
System.out.println(name + " finish wait");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
};
new Thread(r).start();
}
// 等待子线程结束
countDownLatch.await();
}
@Test
public void testCyclicBarrierWithAction() throws InterruptedException {
int count = 5;
CyclicBarrier barrier = new CyclicBarrier(count, () -> {
System.out.println("all arrive");
});
// ...和上面方法内容一致,区别在于所有线程到达await()处时,会触发action,打印“all arrive”。
}
}
栅栏可以使用闭锁来实现,每个线程达到栅栏状态时CountDown,所有的线程到达栅栏即CountDown递减到了0,唤醒闭锁上的await线程。但是闭锁是一次性对象,栅栏可以重复利用。看一下栅栏时如何实现的。
栅栏是基于锁和条件对象,以及一个计数值,在执行await方法时,会判断计数值,如果非0,就阻塞在条件对象上,否则就唤醒该条件对象上的所有线程。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// Generation是为了reset服务的,内部只有一个isbroken状态。
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 如果设定了到达栅栏的触发动作
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 阻塞在条件对象上,调用await会释放锁
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
Barrier是在await()方法内判断count值来决定是否需要继续阻塞。类似把countDown的值放在了await逻辑中,当指定count的线程到达await()位置时,阻塞状态解除,除此之外,还可以配置阻塞解除时的action(Runnable对象)。且和CountDownLatch不同的是,countdownLatch是一次性的,barrier支持reset来复用。
FutureTask
FutureTask实现了Future的语义(基于回调的“未来式”),表示一直抽象的可生成结果的计算。FutureTask表示的计算是通过Callable实现的,相当于一种可生成结果的Runnable,并且可以处于以下三种状态:等待运行、正在运行和运行完成,运行完成即可以获取计算结果,否在获取结果的操作会阻塞。
FutureTask在任务执行框架(线程池)中代表一种异步任务。在单个线程中也可使用,作为一种异步任务。
A cancellable asynchronous computation. This class provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. The result can only be retrieved when the computation has completed; the get methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked using runAndReset()).
A FutureTask can be used to wrap a Callable or Runnable object. Because FutureTask implements Runnable, a FutureTask can be submitted to an Executor for execution.
In addition to serving as a standalone class, this class provides protected functionality that may be useful when creating customized task classes.
- Sample
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Callable<Integer> task = () -> {
Thread.sleep(1500);
return new Random().nextInt(1000);
};
FutureTask<Integer> future = new FutureTask<>(task);
new Thread(future).start();
System.out.println(future.isDone());
System.out.println("future result= " + future.get());
}
}
实际上典型使用是Runnable的有返回结果的封装。
同步工具类实现原理
以上工具类,实际上都可以使用Lock来实现,且在底层,都是基于AbstractQueuedSynchroinzer类。
- ReentrantLock && Semaphore && CountDownLatch
查看源码不难发现,它们同步机制的实现,是委托给这个内部类实现的:
abstract static class Sync extends AbstractQueuedSynchronizer {
// ...
}
- CyclicBarrier
CyclicBarrier是基于ReentrantLock实现的。
所以这几个同步工具的基础,都是AQS。关于AQS的介绍,在笔记《14.AbstractQueuedSynchronizer源码剖析》中有详细说明,可以说AQS是Concurrent包的基石,而AQS又是站在CAS的基础上的同步工具基类。
- FutureTask
FutureTask没有使用AQS作为委托来实现阻塞和触发,它内部通过run方法的运行结束(正常或异常)来维护一个state值,并且会唤醒阻塞的线程,get方法判断state值来决定是阻塞还是返回值。
部分FutureTask源码:
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
// 阻塞直到任务结束
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
// 任务
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
// 任务执行完成,set方法会修改状态以及唤醒线程
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
参考资料
[1] Java并发编程实战