线程之间的通信
前言
最近在看并发编程艺术这本书,对看书的一些笔记及个人工作中的总结。
timg.jpeg前言
每个线程单独的运行,没有多大的价值。线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体,线程间的通信就成为整体的必用方式之一。当线程存在通信指挥,系统间的交互性会更强大,在提高cpu利用率的同时还会使开发对线程任务在处理的过程中进行有效的把控和监督。
等待/通知机制
方法名词 | 描述 |
---|---|
notify() | 通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是改线程获取到了对象的锁 |
notifyAll() | 通知所有等待在该对象上的线程 |
wait() | 调用该方法的线程进入waiting状态,只有等待另外线程的通知或被中断才会返回,需要注意,调用wait()方法后,会释放对象的锁。 |
wait(long) | 超时等待一段时间,这里的参数时间是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回 |
wait(long,int) | 对于超时时间更细粒度的控制,可以达到纳秒 |
先看一个demo:
/**
* wait notfiy 方法,wait释放锁,notfiy不释放锁
* 为什么先让t2线程先执行?
* 因为t2执行了,那么lock.wait()释放锁,t1进入执行,,然后执行到第5次的时候,发出通知,执行lock.notify();但是lock.notify();并没有释放锁,所以继续执行,执行10次之后释放锁,
* t2执行,弊端就是没有实时通知,解决方案看第三个列子
*
* 如果t1先执行,t2后执行,那么执行10次之后list长度等于10,释放锁,执行t2,然后wait等待,下面的代码就不执行了
*
*/
public class ListAdd2 {
private volatile static List list = new ArrayList();
public void add(){
list.add("bjsxt");
}
public int size(){
return list.size();
}
public static void main(String[] args) {
final ListAdd2 list2 = new ListAdd2();
// 1 实例化出来一个lock
//当使用wait 和 notify 的时候 , 一定要配合着synchronized关键字去使用
final Object lock = new Object();
Thread t1 = new Thread(() -> {
try {
synchronized (lock) {
for(int i = 0; i <10; i++){
list2.add();
System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
Thread.sleep(500);
if(list2.size() == 5){
System.out.println("已经发出通知..");
lock.notify();
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (lock) {
if(list2.size() != 5){
try {
//System.out.println("t2进入...");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
throw new RuntimeException();
}
}, "t2");
t2.start();
t1.start();
}
}
执行结果:
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
已经发出通知..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t2收到通知线程停止..
Exception in thread "t2" java.lang.RuntimeException
at com.zhihao.miao.day04.ListAdd2.lambda$main$1(ListAdd2.java:64)
at java.lang.Thread.run(Thread.java:745)
总结:
1.使用wait()、notify()和notifyAll()时需要先对调用对象加锁。
2.调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。 释放锁。
3.notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。notify()或notifyAll()不释放锁。
4.notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
5.从wait()方法返回的前提是获得了调用对象的锁。
6.wait和notify必须配合synchronized关键字使用
上面的demo有个弊端就是当list.size==5,执行了lock.notify();但是并没有释放锁,在执行完list遍历之后释放了锁,而我们的需求是list.size等于5的时候就执行等待的操作,怎样解决这样的场景呢?
/**
*CountDownLatch解决了实时性,当我们在list.size()等于5的时候就唤醒countDownLatch.await()的线程,也就保存了实时性
*而使用notify却不能释放锁,等待循环结束释放锁,
*/
public class ListAdd3 {
private volatile static List list = new ArrayList();
public void add(){
list.add("bjsxt");
}
public int size(){
return list.size();
}
public static void main(String[] args) {
final ListAdd2 list2 = new ListAdd2();
final CountDownLatch countDownLatch = new CountDownLatch(1); //这边的参数为几,下面的countDown()就要执行几次才能唤醒
Thread t1 = new Thread(() -> {
try {
for(int i = 0; i <10; i++){
list2.add();
System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
Thread.sleep(500);
if(list2.size() == 5){
System.out.println("已经发出通知..");
countDownLatch.countDown(); //释放锁
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
Thread t2 = new Thread(() -> {
if(list2.size() != 5){
try {
countDownLatch.await();//等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
throw new RuntimeException();
}, "t2");
t2.start();
t1.start();
}
}
执行结果:
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
已经发出通知..
Exception in thread "t2" java.lang.RuntimeException
当前线程:t1添加了一个元素..
at com.zhihao.miao.day04.ListAdd3.lambda$main$1(ListAdd3.java:53)
当前线程:t2收到通知线程停止..
at java.lang.Thread.run(Thread.java:745)
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
实现了我们的业务逻辑
Thread.join()的使用
如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才从thread.join()返回。线程Thread除了提供join()方法之外,还提供了join(long millis)和join(long millis,int nanos)两个具备超时特性的方法。这两个超时方法表示,如果线程thread在给定的超时时间里没有终止,那么将会从该超时方法中返回。
public class Join {
public static void main(String[] args) throws Exception {
Thread previous = Thread.currentThread();
for (int i = 0; i < 10; i++) {
// 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
Thread thread = new Thread(new Domino(previous), String.valueOf(i));
thread.start();
previous = thread;
}
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + " terminate.");
}
static class Domino implements Runnable {
private Thread thread;
public Domino(Thread thread) {
this.thread = thread;
}
public void run() {
try {
thread.join();
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + " terminate.");
}
}
}
main terminate.
0 terminate.
1 terminate.
2 terminate.
3 terminate.
4 terminate.
5 terminate.
6 terminate.
7 terminate.
8 terminate.
9 terminate.
执行结果很明显,因为main线程先启动,调用了join表名正在执行完的线程执行结果才执行下面的线程。
查看了join的源码可知:
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
当线程终止时,会调用线程自身的notifyAll()方法,会通知所有等待在该线程对象上的线程。
Condition的实现分析
1.等待队列
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
2.等待
调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。
3.通知
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。
Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。
public class TestCondition {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void method1(){
try {
lock.lock();
System.out.println("当前线程:" + Thread.currentThread().getName() + "进入等待状态..");
Thread.sleep(3000);
System.out.println("当前线程:" + Thread.currentThread().getName() + "释放锁..");
condition.await(); // Object wait
System.out.println("当前线程:" + Thread.currentThread().getName() +"继续执行...");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void method2(){
try {
lock.lock();
System.out.println("当前线程:" + Thread.currentThread().getName() + "进入..");
Thread.sleep(3000);
System.out.println("当前线程:" + Thread.currentThread().getName() + "发出唤醒..");
condition.signal(); //Object notify
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
final TestCondition uc = new TestCondition();
Thread t1 = new Thread(() -> uc.method1(), "t1");
Thread t2 = new Thread(() -> uc.method2(), "t2");
t1.start(); //t1先执行,然后阻塞,然后t2线程执行await释放锁,
t2.start();
}
}
当前线程:t1进入等待状态..
当前线程:t1释放锁..
当前线程:t2进入..
当前线程:t2发出唤醒..
当前线程:t1继续执行...
测试signal或者signalAll方法是否也和notify一样不释放锁?
public class TestCondition2 {
private volatile static List list = new ArrayList();
public void add(){
list.add("miaozhihao");
}
public int size(){
return list.size();
}
public static void main(String[] args) {
final ListAdd2 list2 = new ListAdd2();
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread t1 = new Thread(() -> {
try {
lock.lock();
for(int i = 0; i <10; i++){
list2.add();
System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
Thread.sleep(500);
if(list2.size() == 5){
System.out.println("已经发出通知..");
condition.signal();
}
}
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
Thread t2 = new Thread(() -> {
lock.lock();
if(list2.size() != 5){
try {
System.out.println("线程进入await方法");
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
lock.unlock();
throw new RuntimeException();
}
, "t2");
t2.start();
t1.start();
}
}
执行结果:
线程进入await方法
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
已经发出通知..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t2收到通知线程停止..
Exception in thread "t2" java.lang.RuntimeException
at com.zhihao.miao.day10.TestCondition2.lambda$main$1(TestCondition2.java:63)
at java.lang.Thread.run(Thread.java:745)
由此可见答案是肯定的,即signal合signalAll也是不释放锁的。
多Condition的使用
public class TestManyCondition {
private ReentrantLock lock = new ReentrantLock();
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
public void m1(){
try {
lock.lock();
System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m1等待..");
c1.await();
System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m1继续..");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m2(){
try {
lock.lock();
System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m2等待..");
c1.await();
System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m2继续..");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m3(){
try {
lock.lock();
System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m3等待..");
c2.await();
System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m3继续..");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m4(){
try {
lock.lock();
System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
c1.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m5(){
try {
lock.lock();
System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
c2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
final TestManyCondition umc = new TestManyCondition();
Thread t1 = new Thread(() -> umc.m1(),"t1");
Thread t2 = new Thread(() -> umc.m2(),"t2");
Thread t3 = new Thread(() -> umc.m3(),"t3");
Thread t4 = new Thread(() -> umc.m4(),"t4");
Thread t5 = new Thread(() -> umc.m5(),"t5");
t1.start(); // c1
t2.start(); // c1
t3.start(); // c2
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t4.start(); // 唤醒了c1
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t5.start(); // 唤醒了c2
}
}
当前线程:t1进入方法m1等待..
当前线程:t2进入方法m2等待..
当前线程:t3进入方法m3等待..
当前线程:t4唤醒..
当前线程:t1方法m1继续..
当前线程:t2方法m2继续..
当前线程:t5唤醒..
当前线程:t3方法m3继续..
t4唤醒了t1和t2,而t5唤醒了t3,实际工作中根据业务的不同来选择使用。