Java 杂谈

Java 利用wait和notify实现阻塞队列

2019-03-16  本文已影响33人  jwfy

更多并发相关内容,查看==>Java 线程&并发学习目录

wait和notify以及notifyAll 都是Object类提供的基本native本地方法,可提供多线程间协作功能,可以控制线程的暂停和唤醒等操作,也就可以实现简洁版本的消费者生产者模型。接下来就学习下其使用特点,最后利用wait和notify编写一个生产者消费者模型的队列。

wait和notify的特点

image
private boolean flag = true;

public synchronized void deal() {
    try {
        while(!flag) {
            this.wait();
            // 这个this就是指当期对象,同时synchronized也是同步方法,指定的监视器是同一个
            // 而且需要注意这个while 循环
        }
        flag = false;
        this.notify();
        // 理由同上
        System.out.println("唤醒了一个线程")
    } catch (Execption e) {
        e.printStackTrace();
    }
}

现在假设有线程A、B、C,其中AB都已经经过wait进入到了等待状态,现在线程C中执行了notify()之后,线程A被唤醒,可是这个时候flag值已经被线程C改成了false,那么while就是继续成立的,那么就使得刚刚被唤醒的线程A很快再次执行wait() 进入到等待状态,使得所有的线程全部进入等待,没有线程可以运行,出现死锁状态

这点也是使用wait和notify的缺点,无法指定类型的线程唤醒操作。如果不注意的情况下使用了notify(),就会因为唤醒了不合适的线程出现死锁

生产者消费者模型实践

生产者消费者模型就是当队列为空时消费者停止消费,当队列满时生产者停止生产。利用wait 和 notify 协调生产者线程和消费者线程的关系,再加上一个数组作为队列的容器,生产者的偏移量以及消费者的偏移量就可以完成一个简单的消费者生产者模型,阻塞队列原理也基本类似。具体如下代码

public class WaitAndNotifyBlockQueue<E> {

    private Object OBJ = new Object();
    private Object[] items;
    private int count = 0;
    private int productIndex = 0;
    private int consumerIndex = 0;

    public WaitAndNotifyBlockQueue(int count) {
        this.items = new Object[count];
        printf();
    }

    public void put(E e) {
        synchronized (OBJ) {
            try {
                while (count >= items.length) {
                    OBJ.wait();
                }
                if (productIndex+1 > items.length) {
                    productIndex = 0;
                }
                items[productIndex++] = e;
                count += 1;
                System.out.println(Thread.currentThread().getName() + " product:" + e);
                printf();
                OBJ.notify();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }

    public E get() {
        synchronized (OBJ) {
            try {
                while (count <=0) {
                    OBJ.wait();
                }

                if (consumerIndex+1 > items.length) {
                    consumerIndex = 0;
                }
                E e = (E) items[consumerIndex++];
                count -= 1;
                System.out.println(Thread.currentThread().getName() + " consumer:" + e);
                printf();
                OBJ.notify();
                return e;
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
        return null;
    }

    public int getCount() {
        synchronized (OBJ) {
            return count;
        }
    }

    private void printLine() {
        System.out.println();
        for (int i = 0; i < items.length; i++) {
            System.out.print("-----");
        }
        System.out.println();
    }

    // 便于查看数据而已,无实际用途
    public void printf() {
        synchronized (OBJ) {
            for (int i = 1; i <= items.length; i++) {
                if (i == productIndex) {
                    System.out.print("  ⬇  ");
                } else {
                    System.out.print("     ");
                }
            }
            printLine();
            for (int i = 0; i < items.length; i++) {
                Object num = items[i];
                if (num != null) {
                    System.out.printf("%4d", items[i]);
                } else {
                    System.out.print("NULL");
                }
                System.out.print("|");
            }
            printLine();
            for (int i = 1; i <= items.length; i++) {
                if (i == consumerIndex) {
                    System.out.print("  ⬆  ");
                } else {
                    System.out.print("     ");
                }
            }
            System.out.println("\nstart: " + consumerIndex  + ", end: " + productIndex + ", count: " + count + "\n\n");
        }
    }
}

创建了一个泛型类,包装了一个Object数组,有一个公用的Object锁对象,后面的synchronized都是锁住这个对象,再加个测试用例

WaitAndNotifyBlockQueue<Integer> collection = new WaitAndNotifyBlockQueue(20);
// 队列长度为20
Runnable productRunnable = () -> {
    int n = 20;
    while (n-- > 0) {
        collection.put(new Random().nextInt(100));
        // 生产者随机生成数字添加到队列中
    }
};

Runnable consumerRunnable = () -> {
    int n = 5;
    while (n-- > 0) {
        collection.get();
    }
};

//  5个消费线程,每个线程消费5个
//  1个生成线程,每个线程生成20个

for (int i = 0 ; i < 1; i++) {
    new Thread(productRunnable, "Producter " + i).start();
}

try {
    Thread.sleep(1000 * 1);
} catch (InterruptedException e) {
    e.printStackTrace();
}

for (int i = 0 ; i < 5; i++) {
    new Thread(consumerRunnable, "Consumer " + i).start();
}

分别创建了不同数量个生产者和消费者,分别处理不同数量的数据,如下图最后执行的部分效果图

image
image

更多

本人微信公众号(搜索jwfy)欢迎关注

微信公众号
上一篇 下一篇

猜你喜欢

热点阅读