Java并发编程笔记(五):同步工具类
同步工具类即用来控制并发,协调线程控制流的工具类,BlockingQueue因为提供了take,put等阻塞方法,故也可以作为同步工具类使用,其他的同步工具类主要有信号量,栅栏,闭锁等,这些类功能很强大,使用起来非常简单,且易于理解,它们的内部实现大多数利用了AQS这个机制。
一、闭锁(Latch)
Java里对闭锁这个功能提供了CountDownLatch这个同步工具类。其作用简单理解就是像“门”一样,如果不满足条件,线程都会被大门挡住,当条件满足时,大门会打开。下面是一个简单的Demo:
public class CountDownLatchExample {
public void timeTest(int nThreads) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread thread = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "被阻塞在大门前");
startGate.await();
dosomething();
System.out.println(Thread.currentThread().getName() + "完成工作,将自己的门关闭");
endGate.countDown();
} catch (InterruptedException e) {
}
});
thread.start();
Thread.sleep(1000);
}
long start = System.currentTimeMillis();
startGate.countDown();
endGate.await();
long end = System.currentTimeMillis();
System.out.println("need : " + (end - start) + "ms");
}
public static void main(String[] args) throws InterruptedException {
CountDownLatchExample example = new CountDownLatchExample();
example.timeTest(4);
}
private void dosomething() {
System.out.println(Thread.currentThread().getName() + "开始工作");
Random random = new Random();
for (int i = 0; i < random.nextInt(999999999); i++) {
}
}
}
运行代码,会看到如下输出:
Thread-0被阻塞在大门前
Thread-1被阻塞在大门前
Thread-2被阻塞在大门前
Thread-3被阻塞在大门前
Thread-0开始工作
Thread-3开始工作
Thread-2开始工作
Thread-1开始工作
Thread-1完成工作,将自己的门关闭
Thread-0完成工作,将自己的门关闭
Thread-3完成工作,将自己的门关闭
Thread-2完成工作,将自己的门关闭
need : 9ms
从输出不难看到,就和刚刚描述的那样,线程都被大门卡主了,直到latch内部的值为0,才打开大门。在本例里,latch的作用就是让线程同时开始工作,使得线程公平的竞争CPU资源。
二、栅栏(barrier)
栅栏类似于闭锁,他们的关键区别是闭锁用于等待某个事件(对于CountDownLatch来说就是等待内部的值为0这个事件),栅栏用于等待其他线程,例如聚会的时候,约定好9点在万科门口集合,先到的同学原地等待其他同学的到来。下面是一个Demo:
public class CycleBarrierExample extends Thread {
private final CyclicBarrier barrier;
public CycleBarrierExample(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 正在工作!");
try {
Thread.sleep(new Random().nextInt(2000));
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " 完成工作");
barrier.await(); //等待其他线程也完成
} catch (InterruptedException | BrokenBarrierException e) {
}
System.out.println(System.currentTimeMillis() + ":" + "其他线程也完成了工作");
}
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(3);
CyclicBarrier barrier = new CyclicBarrier(3);
CycleBarrierExample example1 = new CycleBarrierExample(barrier);
CycleBarrierExample example2 = new CycleBarrierExample(barrier);
CycleBarrierExample example3 = new CycleBarrierExample(barrier);
service.execute(example1);
service.execute(example2);
service.execute(example3);
service.shutdown();
service.awaitTermination(200, TimeUnit.SECONDS);
}
}
运行代码,可以看到如下输出:
pool-1-thread-2 正在工作!
pool-1-thread-1 正在工作!
pool-1-thread-3 正在工作!
1530427790381 : pool-1-thread-2 完成工作
1530427790604 : pool-1-thread-3 完成工作
1530427791208 : pool-1-thread-1 完成工作
1530427791208:其他线程也完成了工作
1530427791208:其他线程也完成了工作
1530427791208:其他线程也完成了工作
前六行输出顺序不一定就是上面这样,不过后面三行一定是最后的三行。在Demo里,创建了三个线程,这个三个线程分别做一些事情(这里使用sleep来模拟),也创建了一个barrier来控制,注意barrier构造函数里的int类型参数代表的就是线程总数(拿聚会集合来讲,就是一共有几个人参加聚会)。先完成工作的线程必须等待其他线程,不能继续往下执行,这点从输出中的时间戳不难看出。barrier比较适合的场景是将大任务切分成小任务,然后合并结果,小任务处理的时间不确定,但是为了方便合并结果,需要等当前切分的小任务都完成了再合并,这样整个“分而治之”的过程会比较稳定。
三、信号量(semaphore)
信号量内部也有一个值,整个值代表的是允许量(称为通行证的数量会比较容易理解),举个例子就是组团去某个公司参观,因为公司有门禁,每次只能20个人进去,同时这20个人比较有通行证才行。持有通行证的同学进入公司参观,参观后要将通行证归还,以便其他同学拿到通行证继续参观公司。一个Demo如下:
public class SemaphoreExample {
private final Semaphore semaphore;
public SemaphoreExample(int tokens) {
this.semaphore = new Semaphore(tokens);
}
public void visit() {
try {
//请求获取通行证,获取不到则阻塞直到有空余的通行证或者被中断
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " - 允许参观" );
dosomething();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//参观完或者发生什么意外事件都要归还通行证
semaphore.release();
}
}
private void dosomething() {
try {
Thread.sleep(new Random().nextInt(5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
SemaphoreExample semaphoreExample = new SemaphoreExample(20);
for (int i = 0; i < 40; i++) {
Thread thread = new Thread(semaphoreExample::visit);
thread.start();
}
}
}
代码运行输出就不贴出来了,有些长。上面这一小段Demo就是模拟了上面描述的场景,多运行几次,不难看到中间确实会停顿那么一小段时间,就是因为通行证不够了,线程被阻塞了。信号量还可以应用在控制同一时间的并发量,做一些简单的流量控制。
小结
Java里的同步工具类功能很强大,使用起来也很简单,一般来说仅仅需要使用几个API就能满足大部分场景。善用这些工具类,可以大大提升开发效率,且构建出安全,健壮的项目。