Java 利用wait和notify实现阻塞队列
2019-03-16 本文已影响33人
jwfy
更多并发相关内容,查看==>Java 线程&并发学习目录
wait和notify以及notifyAll 都是Object类提供的基本native本地方法,可提供多线程间协作功能,可以控制线程的暂停和唤醒等操作,也就可以实现简洁版本的消费者生产者模型。接下来就学习下其使用特点,最后利用wait和notify编写一个生产者消费者模型的队列。
wait和notify的特点
- wait() 、notify() 使用前需要获取当前对象监视器Monitor对象,一般情况和synchronized关键字配合使用
- wait() 方法调用后会使得当前所在的线程暂停运行,并且进入到Monitor监视器的Wait Set集合中,具体可看如下图
- notify() 方法是可以随机唤醒一个在WaitSet集合内的线程进入到EntryList(图中的EntrySet)中,这会导致死锁,例如如下代码块
private boolean flag = true;
public synchronized void deal() {
try {
while(!flag) {
this.wait();
// 这个this就是指当期对象,同时synchronized也是同步方法,指定的监视器是同一个
// 而且需要注意这个while 循环
}
flag = false;
this.notify();
// 理由同上
System.out.println("唤醒了一个线程")
} catch (Execption e) {
e.printStackTrace();
}
}
现在假设有线程A、B、C,其中AB都已经经过wait进入到了等待状态,现在线程C中执行了notify()之后,线程A被唤醒,可是这个时候flag值已经被线程C改成了false,那么while就是继续成立的,那么就使得刚刚被唤醒的线程A很快再次执行wait() 进入到等待状态,使得所有的线程全部进入等待,没有线程可以运行,出现死锁状态
这点也是使用wait和notify的缺点,无法指定类型的线程唤醒操作。如果不注意的情况下使用了notify(),就会因为唤醒了不合适的线程出现死锁
- notifyAll() 则是可以唤醒所有wait的线程,需要注意的是唤醒了并不意味着执行了,唤醒这个操作只是相当于所有的WaitSet集合的线程进入到EntryList中,随后还是需要通过竞争和CPU时间片获取才可以真正的继续执行线程,这就存在一个问题了,如果恰好最后执行的线程又是不合适的线程,同样导致死锁。有人会说可以调低线程的优先级,可是这更不好控制线程工作,反而会出现饥饿的情况。所以notifyAll() 依旧可能会出现死锁,只是概率降低了,在一般场景下如果可以预见线程的情况,优先建议使用notifyAll()
- wait以及notify的暂停和唤醒是没有先后顺序的,也就使得在开发中必须保证wait在notify之前被调用
生产者消费者模型实践
生产者消费者模型就是当队列为空时消费者停止消费,当队列满时生产者停止生产。利用wait 和 notify 协调生产者线程和消费者线程的关系,再加上一个数组作为队列的容器,生产者的偏移量以及消费者的偏移量就可以完成一个简单的消费者生产者模型,阻塞队列原理也基本类似。具体如下代码
public class WaitAndNotifyBlockQueue<E> {
private Object OBJ = new Object();
private Object[] items;
private int count = 0;
private int productIndex = 0;
private int consumerIndex = 0;
public WaitAndNotifyBlockQueue(int count) {
this.items = new Object[count];
printf();
}
public void put(E e) {
synchronized (OBJ) {
try {
while (count >= items.length) {
OBJ.wait();
}
if (productIndex+1 > items.length) {
productIndex = 0;
}
items[productIndex++] = e;
count += 1;
System.out.println(Thread.currentThread().getName() + " product:" + e);
printf();
OBJ.notify();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
public E get() {
synchronized (OBJ) {
try {
while (count <=0) {
OBJ.wait();
}
if (consumerIndex+1 > items.length) {
consumerIndex = 0;
}
E e = (E) items[consumerIndex++];
count -= 1;
System.out.println(Thread.currentThread().getName() + " consumer:" + e);
printf();
OBJ.notify();
return e;
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
return null;
}
public int getCount() {
synchronized (OBJ) {
return count;
}
}
private void printLine() {
System.out.println();
for (int i = 0; i < items.length; i++) {
System.out.print("-----");
}
System.out.println();
}
// 便于查看数据而已,无实际用途
public void printf() {
synchronized (OBJ) {
for (int i = 1; i <= items.length; i++) {
if (i == productIndex) {
System.out.print(" ⬇ ");
} else {
System.out.print(" ");
}
}
printLine();
for (int i = 0; i < items.length; i++) {
Object num = items[i];
if (num != null) {
System.out.printf("%4d", items[i]);
} else {
System.out.print("NULL");
}
System.out.print("|");
}
printLine();
for (int i = 1; i <= items.length; i++) {
if (i == consumerIndex) {
System.out.print(" ⬆ ");
} else {
System.out.print(" ");
}
}
System.out.println("\nstart: " + consumerIndex + ", end: " + productIndex + ", count: " + count + "\n\n");
}
}
}
创建了一个泛型类,包装了一个Object数组,有一个公用的Object锁对象,后面的synchronized都是锁住这个对象,再加个测试用例
WaitAndNotifyBlockQueue<Integer> collection = new WaitAndNotifyBlockQueue(20);
// 队列长度为20
Runnable productRunnable = () -> {
int n = 20;
while (n-- > 0) {
collection.put(new Random().nextInt(100));
// 生产者随机生成数字添加到队列中
}
};
Runnable consumerRunnable = () -> {
int n = 5;
while (n-- > 0) {
collection.get();
}
};
// 5个消费线程,每个线程消费5个
// 1个生成线程,每个线程生成20个
for (int i = 0 ; i < 1; i++) {
new Thread(productRunnable, "Producter " + i).start();
}
try {
Thread.sleep(1000 * 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0 ; i < 5; i++) {
new Thread(consumerRunnable, "Consumer " + i).start();
}
分别创建了不同数量个生产者和消费者,分别处理不同数量的数据,如下图最后执行的部分效果图
imageimage
更多
- 关于wait和notify的底层native实现,可以看看 http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/111b95ad4584/src/share/vm/runtime/objectMonitor.cpp
- LockSupport的pack和unpack也是native方法,pack暂停线程后不会释放锁
- 在JUC中的条件队列就实现了指定特定线程的唤醒操作,阻塞队列BlockingQueue就是使用了条件队列完成消费者和生产者模型的
- Tomcat早期版本就是使用wait和notify完成链接器和处理器的套接字传递的
本人微信公众号(搜索jwfy)欢迎关注
微信公众号