Java并发编程基础-线程间通信
线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者说价值很少,如果多个线程能够相互配合完成工作,这将会带来巨大的价值。
volatile和synchronized关键字
Java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特性),所以程序在执行过程中,一个线程看到的变量并不一定是最新的。
以上引用自《Java并发编程的艺术》,同时也说明了我们在 Java并发机制的底层实现原理-volatile 中所提到的那个问题,为什么另一个线程会无法跳出循环。而volatile
关键字用来修饰字段或成员变量,就是告知程序该变量在任何时候都需要从共享内存中获取,并且对他的改变必须同步刷新回共享内存,所以能保证对所有线程访问的可见性。
关键字
synchronized
可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。
详情参考之前写的另一篇文章 Java并发机制的底层实现原理-synchronized
等待/通知机制
一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个 过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模 式隔离了“做什么”(what)和“怎么做”(How),在功能层面上实现了解耦,体系结构上具备了良 好的伸缩性,但是在Java语言中如何实现类似的功能呢?
最简单的办法是写一个循环,检查是否当前需要执行对应操作,直到满足条件执行,退出循环。
while (value != desire) {
Thread.sleep(1000);
}
doSomeThing();
上面这段伪代码在条件不满足的时候睡眠一段时间,这样做的目的是防止过快的“无效”尝试,这种方式看似可以实现所需的功能,但是会存在如下问题:
- 难以确保及时性,比如无法及时发现条件已经变化,也就是及时性难以保证
- 难以降低开销,如果将睡眠时间缩短,比如睡眠1毫秒,这样能更迅速的发现条件变化,但是如果条件长时间没有变化,则又会加剧消耗更多的处理器资源,造成无端的浪费
如何解决以上两个问题?Java通过内置的等待/通知机制能够很好的解决这个矛盾并且实现上述需求。
下表为等待/通知机制的相关方法:
方法名称 | 方法描述 |
---|---|
notify() |
通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提 是该线程获取到了对象的锁 |
notifyAll() |
通知所有等待在该对象上的线程 |
wait() |
调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被 中断才会返回,需要注意,调用wait()方法后,会释放对象的锁 |
wait(long) |
超时等待一段时间,这里的参数时间是毫秒也就是等待长达n毫秒, 如果没有通知就超时返回 |
wait(long, int) |
对于超时时间更细力度的控制,可以控制到纳秒 |
等待/通知机制,是指一个线程A调用了对象O的
wait()
方法进入等待状态,而另一个线程B调用了对象O的notify()
或者notifyAll()
方法,线程A收到通知后从对象O的wait()
方法返回,进而执行后续操作。上述两个线程通过对象O来完成交互,而对象上的wait()
和notify/notifyAll()
的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。
如下所示代码中,首先启动了waitThread线程,该线程获取到了lock对象的锁,随后调用了wait方法释放锁,等待其他线程通知唤醒自身,随后等待一毫秒启动notifyThread线程,因为在一定情况下两个线程同时启动是有可能后启动的线程先抢到CPU资源而先执行。由于waitThread释放了锁,所以在notifyThread内一定可以获取到锁,随后notifyThread调用notifyAll方法唤醒等待的waitThread,此时waitThread不能立即获取到锁,需要等待notifyThread释放锁时候才可以获取到锁,即在调用notifyAll方法5毫秒之后waitThread重新获取到锁并且结束循环,执行最后的代码,程序结束。
public class WaitNotify {
static boolean flag = true;
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
Thread.sleep(1);
Thread notifyThread = new Thread(new Notify(), "NotifyThread");
notifyThread.start();
}
static class Wait implements Runnable {
@Override
public void run() {
// 加锁,拥有lock对象的锁
synchronized (lock) {
// 当条件不满足时,继续wait,同时释放了lock的锁
while (flag) {
try {
System.out.println(Thread.currentThread().getName() + " flag is true. wait at " + System.currentTimeMillis());
lock.wait();
} catch (InterruptedException ex) {
}
}
// 条件满足时,跳出循环,执行最后的代码
System.out.println(Thread.currentThread().getName() + " flag is false. running at " + System.currentTimeMillis());
}
}
}
static class Notify implements Runnable {
@Override
public void run() {
// 加锁,拥有lock对象的锁
synchronized (lock) {
// 获取lock对象的锁,然后进行通知,通知时不会释放lock的锁,
// 知道当前线程释放了lock后,WaitThread才能从wait方法中返回
System.out.println(Thread.currentThread().getName() + " hold lock. notify at " + System.currentTimeMillis());
lock.notifyAll();
flag = false;
try {
Thread.sleep(5);
} catch (InterruptedException e) {
}
}
}
}
}
细节如下:
- 使用
wait()
、notify()
和notifyAll()
时需要先对调用对象加锁 - 调用
wait()
方法后,线程状态由RUNNING
变为WAITING
,并将当前线程放置到对象的 等待队列 -
notify()
或notifyAll()
方法调用后,等待线程依旧不会从wait()
返回,需要调用notify()
或notifAll()
的线程释放锁之后,等待线程才有机会从wait()
返回 -
notify()
方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()
方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING
变为BLOCKED
- 从
wait()
方法返回的前提是获得了调用对象的锁
总结
从上述WaitNotify实例中可以提炼出等待/通知的经典范式,该范式分为两部分,分别针对等待方(消费者)和通知方(生产者)
等待方遵循如下原则:
- 获取对象的锁
- 如果条件不满足,那么调用
wait()
方法等待,被通知后仍要检查条件 - 条件满足则执行对应的逻辑
对应伪代码如下:
synchronized(对象) {
while(条件不满足) {
对象.wait();
}
}
通知方遵循如下原则:
- 获得对象的锁
- 改变条件
- 通知所有等待在对象上的线程
对应的伪代码如下:
synchronized(对象) {
改变条件
对象.notifyAll();
}
管道输入/输出流
管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。
管道输入/输出流主要包括了如下4种具体实现:PipedOutputStream
、PipedInputStream
、PipedReader
和PipedWriter
,前两种面向字节,而后两种面向字符。
以下代码使用PipedReader
和PipedWriter
为例,从控制台接收用户输入再通过输入流传递到另一线程中的输出流,从而实现了线程间以流的方式的数据传输:
public class Piped {
public static void main(String[] args) throws Exception {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader(); // 将输出流和输入流进行连接,否则在使用时会抛出IOException
out.connect(in);
Thread printThread = new Thread(new Print(in), "PrintThread");
printThread.start();
int receive = 0;
try {
// 从控制台接收输入
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
} finally {
out.close();
}
}
static class Print implements Runnable {
private PipedReader in;
public Print(PipedReader in) {
this.in = in;
}
public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException ex) {
}
}
}
}
注意对于Piped类型的流,必须先要进行绑定,也就是调用connect()方法,如果没有将输入/输 出流绑定起来,对于该流的访问将会抛出异常。
Thread.join()的使用
如果A线程执行了Thread.join()
,那么表示当前线程需要等待A线程执行完毕返回之后才会继续执行其后的代码,除了该方法以外,Thread还提供了了join(long millis)
和join(long millis,int nanos)
方法,这两个方法具有超时性质,如果在规定等待时间内A线程未能执行结束,那么程序将从该方法中返回,继续执行其后的代码。
以下代码为Thread.join()方法的示例:
public class Join {
public static void main(String[] args) throws InterruptedException {
Runnable runnableA = new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " run at " + System.currentTimeMillis());
Thread.sleep(5);
} catch (InterruptedException e) {
}
}
};
Runnable runnableB = new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " run at " + System.currentTimeMillis());
Thread.sleep(10);
System.out.println(Thread.currentThread().getName() + " end at " + System.currentTimeMillis());
} catch (InterruptedException e) {
}
}
};
Thread threadA = new Thread(runnableA, "Thread-A");
Thread threadB = new Thread(runnableB, "Thread-B");
threadA.start();
threadA.join();
System.out.println(threadA.getName() + " end at " + System.currentTimeMillis());
threadB.start();
threadB.join(5);
System.out.println(threadB.getName() + " returned at " + System.currentTimeMillis());
}
}
该方法执行结果如下:
Thread-A run at 1593665701058
Thread-A end at 1593665701064
Thread-B run at 1593665701064
Thread-B returned at 1593665701070
Thread-B end at 1593665701075
可以看到A线程从执行到结束经过了6毫秒左右,而从线程B执行到线程B返回也只经过了6毫秒左右,然而实际B线程执行到结束经过了11毫秒左右。这就说明B线程由于等待超时已经从join()方法中返回。
上面的例子实际执行时间会比我们程序设置的等待时间多出1-2毫秒,这是由于线程等待和唤醒之间的切换所消耗的时间。