并发工具类
CountDownLatch
专业术语:在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
大白话:就是有N个人投票(线程),统计员等待着所有人投完票的过程。
注意:线程执行完毕后,调用countDown()
,该线程是执行完毕的。
使用:
int size = 10; //定义十个人投票
CountDownLatch latch = new CountDownLatch(size); //定义10个容量计数器
for (int i = 0; i < size; i++) {
// 模拟投票
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 投票...");
latch.countDown(); //投完票计数器减一
}).start();
}
latch.await();//等待计数器为0 才进行下面的任务
System.out.println("所有人投票完毕,统计票数...");
CountDownLatch的api:
- CountDownLatch.await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
- CountDownLatch.countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
- CountDownLatch.getCount() 返回当前计数。
- CountDownLatch.toString() 返回标识此锁存器及其状态的字符串。状态用括号括起来,包括字符串 "Count =",后跟当前计数。
CyclicBarrier
专业术语:它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。
大白话:就是队员集合(多个线程),已经集合的人等待剩下的人集合完毕后,队长开始统计人数的过程。
注意:这个过程中的线程是出于等待状态的。
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
/**
* 部队集合-> 统计人数 -> 解散
*/
public class Demo {
Random random = new Random();
public void meet(CyclicBarrier cyclicBarrier){
try {
TimeUnit.SECONDS.sleep(random.nextInt(4)); // 随机休息4s的时间
System.out.println(Thread.currentThread().getName() + " 集合...."); //模拟队员集合
cyclicBarrier.await(); //等待其他队员集合
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 解散");
}
public static void main(String[] args) throws Exception {
Demo d = new Demo();
int size = 10; //定义队员数量
//定义屏障数量,并达到数量之后完成对应的操作
CyclicBarrier barrier = new CyclicBarrier(size, () -> System.out.println("开始报数...."));
for (int i = 0; i < size; i++) {
new Thread(() -> d.meet(barrier)).start();
}
}
}
CyclicBarrier的api:
- CyclicBarrier.await() 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
- CyclicBarrier.getNumberWaiting() 返回当前在屏障处等待的参与者数目。
- CyclicBarrier.getParties() 返回要求启动此 barrier 的参与者数目。
- CyclicBarrier.isBroken() 查询此屏障是否处于损坏状态。
- CyclicBarrier.reset() 将屏障重置为其初始状态。
Semaphore
专业术语:从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
通常,应该将用于控制资源访问的信号量初始化为公平的,以确保所有线程都可访问资源。为其他的种类的同步控制使用信号量时,非公平排序的吞吐量优势通常要比公平考虑更为重要。
大白话:类似世博会,中国会场里面限定100个人同时参观,出来一个人,在放进去一个。
/**
* 模拟参观世博会,并限制人流
*/
public class Demo {
public void visit(Semaphore semaphore) {
try {
semaphore.acquire(); // 获取信号量
System.out.println(Thread.currentThread().getName() + " 参观会场");
semaphore.release(); // 释放信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 创建一个容量5的公平的信号量
Semaphore semaphore = new Semaphore(5, true);
Demo demo = new Demo();
for (int i = 0; i < 10; i++) {
new Thread(() -> demo.visit(semaphore));
}
}
}
Semaphore的api:
- Semaphore.acquire() 获取信号量
- Semaphore.release() 释放信号量
Exchanger
专业术语:对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。
用途:可以用于两个线程某个对象的比对、或者对象的传递
public class Demo {
public void a(Exchanger<String> exch) {
System.out.println("a 方法执行...");
try {
System.out.println("a 线程正在抓取数据...");
Thread.sleep(2000);
System.out.println("a 线程抓取到数据...");
String res = "12345";
System.out.println("a 等待对比结果...");
exch.exchange(res);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void b(Exchanger<String> exch) {
System.out.println("b 方法开始执行...");
try {
System.out.println("b 方法开始抓取数据...");
Thread.sleep(4000);
System.out.println("b 方法抓取数据结束...");
String res = "12345";
String value = exch.exchange(res);
System.out.println("开始进行比对...");
System.out.println("比对结果为:" + value.equals(res));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Demo d = new Demo();
Exchanger<String> exch = new Exchanger<>();
new Thread(() -> d.a(exch)).start();
new Thread(() -> d.b(exch)).start();
}
}
Exchanger的api:
- Exchanger.exchange() 等待另一个线程到达此交换点(除非当前线程被 中断),然后将给定的对象传送给该线程,并接收该线程的对象。
如果另一个线程已经在交换点等待,则出于线程调度目的,继续执行此线程,并接收当前线程传入的对象。当前线程立即返回,接收其他线程传递的交换对象。