Java并发(3)-- 线程间通信&生产者消费者问题和哲学家就餐

2020-03-04  本文已影响0人  kolibreath
守望先锋 猎空

本文主要分两个章节,先对线程间通信机制的介绍,然后通过对生产者问题和哲学家问题的解决对线程的基础部分收尾

  1. 线程间通信机制
    1.1 使用同步机制
    1.2 使用轮询机制
    1.3 使用wait/notify
    1.4 使用Lock/Condition
  2. 两个经典问题
    2.1 哲学家问题死锁的解决
    2.2 生产者消费者问题

线程间通信机制

同步机制

使用关键字volatilesynchronized ,前面几篇文章已经说明了这个问题,这里不再重复

使用轮询机制

public class SpinLockTest {


    private static CountDownLatch latch = new CountDownLatch(100);
    private AtomicReference<Thread> ref = new AtomicReference<>();

    public void lock() {
        Thread currentThread = Thread.currentThread();
        while (!ref.compareAndSet(null, currentThread)) {
        }
    }

    public void unLock() {
        Thread thread = Thread.currentThread();
        ref.compareAndSet(thread,null);
    }


    public static void main(String args[]) {
        ExecutorService service = Executors.newCachedThreadPool();
        SpinLockTest test = new SpinLockTest();
        int count[] = {0};
        for (int i = 0; i < 100 ; i++) {
            service.execute(new Thread(() -> {
                test.lock();
                count[0] ++;
                test.unLock();
                latch.countDown();
            }));
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(count[0]);
    }

}

我们使用了CountDownLatch做一个100次的倒计时,如果倒计时0时,结束阻塞。理想情况下,100个线程应该会让最后的结果变成100,而结果和我们预料的一致,假设第一个被调度的线程为A,ref.compareAndSet()返回true(当前是nullexpect的是也是nullref的值被设置成currentThread的值)。当A线程没有unlock()时,如果来一个B线程,不满足while中CAS的条件,开始while循环,B线程会一直询问有锁吗,有锁吗......直到A线程unlock为止。

B线程询问有锁吗?

我们这个例子中也实现了一个自旋锁:一个线程从在阻塞到切换成为别的线程的过程,如果只是执行简单的任务的话,切换线程上下文的时间反而比执行任务的时间还要长。所以我们可以采取自旋锁的方法进行线程的同步。

使用wait/notify

wait()notify()是定义在Object上的native方法,具体的内容有赖于各个平台的实现。

wait/notfity具体使用

  1. wait()和notify()
    wait()函数调用之后线程被挂起。调用了notify()notifyAll()之后会唤醒一个等待这个对象锁的线程,但是只有当退出对象锁的区域才行。
    对象调用notify()之后只会有一个线程去竞争锁,notifyAll()会让所有等待这个对象锁的线程去竞争锁。
  2. 具体使用
    Java中给出了一个使用wait()很明确的套路,就是使用这样的一个结构:
synchronized(object){
  //某种条件
  while(condition){
      //do something  
      wait();
     //do something else
  }
}

首先记住以下原则:

  1. wait()notify()方法必须定义在synchronized方法块中
  2. wait()通常情况下放在while块中,这主要是因为虚假唤醒问题
    下面看一段例子:
public class LockReleaseTest {

    private static Object object = new Object();

    private static class A extends  Thread{

        private Object object;
        public A(Object object){
            this.object = object;
        }
        public void run(){
            synchronized (object) {
                while (!Thread.interrupted()) {
                    try {
                        //让A线程直接wait
                        System.out.println("A进入同步代码块");
                        //wait 将线程挂起 从哪里跌倒从哪里爬起来 如果唤醒了 从这里继续运行
                        object.wait();
                        System.out.println("线程A获得了锁");

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("A退出同步代码块 退出run()");
        }
    }

    private static class B extends  Thread{

        private Object object;
        public B(Object object){
            this.object = object;
        }
        public void run() {
            synchronized (object) {
                while (!Thread.interrupted()) {
                    System.out.println("B进入同步代码块");
                    object.notify();
                    System.out.println("B通知A 从挂起中醒来,但是没有释放锁");
                    try {
                        TimeUnit.MILLISECONDS.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程B退出同步代码块");
                }
                System.out.println("线程B释放了锁");
            }
        }
    }

    public static void main(String args[]) throws InterruptedException {
        A a = new A(object);
        B b = new B(object);

        a.start();
        b.start();
    }
}

A,B分别是两个线程,他们都通过一个公共的object进行同步(通过构造函数传入的),运行之后的结果如下所示:

A进入同步代码块
B进入同步代码块
B通知A 从挂起中醒来,但是没有释放锁
线程B退出同步代码块
线程B释放了锁
线程A获得了锁
A退出同步代码块 退出run()

A进入同步代码块,但是调用了wait()函数之后,线程A就挂起了。但是B线程却可以正常运行。这说明即使A线程调用了wait(),函数没有退出run()但是A线程还是放弃了锁,并且被B线程获得。此时object.notify()运行,却没有让A线程立即恢复,只有当B线程休眠结束并且退出同步代码块,A线程才能继续运行,这就解释了上面的问题,notify()执行之后没有立刻释放锁,只能等待解释同步代码块。
调用wait()方法的时候一定是获得了同步锁的,如果没有在synchronized块中调用wait()方法将抛出异常。

使用Lock 和 Condition

个人认为LockCondition是比设计在Object上的wait()notify()更容易理解的api,所有使用wait&notify的地方都还可以使用Lock&Condition处理。

生产者消费者问题

生产着消费者问题的场景是:消费者消费生产者生产出并且放在队列里面的产品,如果产品用完了消费者需要等待,如果队列满了,生产者等待。

  1. 先使用wait&notify完成:
public class ProducerAndConsumer1 {


    private static final Queue<Content> contents = new LinkedList<>();
    static class Content{
        private String start;
        private String end;
        private String[] places = {"Shanghai", "Wuhan", "GuangZhou", "Hangzhou"};

        public Content(){
            int index = new Random().nextInt(places.length);
            this.start = this.end = places[index];
        }
        public String toString(){
            return " start " + start + " end " + end;
        }
    }


    @SuppressWarnings("Duplicates")
    static class Producer implements Runnable{

        private int maxCount;

        public Producer(int maxCount){
            this.maxCount = maxCount;
        }

        public void run(){
            while(true){
                synchronized (contents){
                  //使用while + wait的语义: 判断是否还要继续等待
                    while(contents.size() == maxCount){
                        System.out.println("The queue is full");
                        try {
                            contents.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        //模拟生产
                        TimeUnit.MILLISECONDS.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    Content content = new Content();
                    contents.add(content);
                    System.out.println("produced "+content);
                    contents.notifyAll();
                }
            }
        }
    }

    @SuppressWarnings("Duplicates")
    static class Consumer implements  Runnable{
        private int maxCount;
        public Consumer(int maxCount){
            this.maxCount = maxCount;
        }

        public void run(){
            while(true){
                synchronized (contents){
                    //使用while + wait的语义: 判断是否还要继续等待
                    while(contents.size() == 0){
                        System.out.println("The queue is empty");
                        try {
                            contents.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Content content =contents.poll();
                    try {
                        //模拟消费
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("consumed " + content);
                    contents.notifyAll();
                }
            }
        }
    }


    public static void main(String args[]) {
        ExecutorService service = Executors.newCachedThreadPool();
        int maxCount = 5;
        //3个生产者 3个消费者

        for (int i = 0; i < 3 ; i++) {
            service.execute(new Producer(maxCount));
            service.execute(new Consumer(maxCount));
        }

    }
}
  1. 使用Lock&Condition解决
public class ProducerAndConsumer2 {

    private static final Queue<Content> contents = new LinkedList<>();

    private static final Lock lock = new ReentrantLock();
    private static final Condition fullQueue = lock.newCondition();
    private static final Condition emptyQueue = lock.newCondition();

    static class Content{
        private String start;
        private String end;
        private String[] places = {"Shanghai", "Wuhan", "GuangZhou", "Hangzhou"};

        public Content(){
            int index = new Random().nextInt(places.length);
            this.start = this.end = places[index];
        }
        public String toString(){
            return " start " + start + " end " + end;
        }
    }


    static class Producer implements Runnable{

        private int maxCount;

        public Producer(int maxCount){
            this.maxCount = maxCount;
        }

        public void run() {
            while (true) {
                lock.lock();
                //使用while + wait的语义: 判断是否还要继续等待
                while (contents.size() == maxCount) {
                    System.out.println("The queue is full");
                    try {
                        fullQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    //模拟生产
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                Content content = new Content();
                contents.add(content);
                System.out.println("produced " + content);
                fullQueue.signalAll();
                emptyQueue.signalAll();

                lock.unlock();
            }

        }
    }

    @SuppressWarnings("Duplicates")
    static class Consumer implements  Runnable{
        private int maxCount;
        public Consumer(int maxCount){
            this.maxCount = maxCount;
        }

        public void run() {
            while (true) {
                lock.lock();
                //使用while + wait的语义: 判断是否还要继续等待
                while (contents.isEmpty()) {
                    System.out.println("The queue is empty");
                    try {
                        emptyQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Content content = contents.poll();
                try {
                    //模拟消费
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("consumed " + content);
                fullQueue.signalAll();
                emptyQueue.signalAll();

                lock.unlock();
            }
        }
    }

Lock&Condition的用法和上文中相同。

  1. 使用阻塞队列来完成生产着消费者问题
    使用阻塞队列能够很好地帮我们托管同步的问题:
public class ProducerAndConsumer {


    private static final int maxCount = 10;
    private static final BlockingQueue<Content> queue = new LinkedBlockingDeque<>(maxCount);

    static class Content {
        private String start;
        private String end;
        private String[] places = {"Shanghai", "Wuhan", "GuangZhou", "Hangzhou"};

        public Content() {
            int index = new Random().nextInt(places.length);
            this.start = this.end = places[index];
        }

        public String toString() {
            return " start " + start + " end " + end;
        }
    }


    @SuppressWarnings("Duplicates")
    static class Producer implements Runnable {

        public void run() {
            while (true) {
                try {
                    //模拟生产
                    TimeUnit.MILLISECONDS.sleep(1000);
                    Content content = new Content();
                    queue.put(content);
                    System.out.println("produced " + content);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        }
    }

    @SuppressWarnings("Duplicates")
    static class Consumer implements Runnable {
        public void run() {
            while (true) {

                try {
                    //模拟消费
                    TimeUnit.MILLISECONDS.sleep(500);
                    Content content = queue.take();
                    System.out.println("consumed " + content);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }

    public static void main(String args[]) {
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 5 ; i++) {
            service.execute(new Producer());
            service.execute(new Consumer());
        }
    }
}

解决哲学家就餐问题

哲学家就餐问题的一种解法是,可以让最后一个人拿起的筷子固定就可以解决:

public class DeadLockTest {
    //通过哲学家问题演示一个思索的情况

    public static  class Chopstick {
        private boolean taken = false;

        public synchronized void take() throws InterruptedException {
            //反复检查是否已经被拿走 如果拿走,就算了
            while (taken) {
                wait();
            }
            taken = true;
        }

        public synchronized void drop(){
            taken = false;
            notifyAll();
        }
    }

    public static class Philosopher implements Runnable {

        private Chopstick left;
        private Chopstick right;

        private final int id;
        private final int ponderFactor;
        private Random rand = new Random(47);

        public Philosopher(Chopstick left, Chopstick right, int ident, int ponder) {
            this.ponderFactor = ponder;
            this.left = left;
            this.right = right;
            id = ident;
        }

        public void pause() throws InterruptedException {
            if (ponderFactor == 0) return;
            TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor * 250));
        }

        public String toString(){
            return "Philosopher" + id;
        }
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    System.out.println(this + " " + "thinking");
                    pause();
                    //哲学家开始就餐
                    System.out.println(this + " " + "grabbing right");
                    right.take();

                    System.out.println(this + " " + "grabbing left" );
                    left.take();

                    System.out.println(this + " " + "eating");
                    pause();

                    right.drop();
                    left.drop();
                }
            } catch (InterruptedException e) {
                System.out.println(this + " " + "exiting via interrupt");
            }
        }
    }

    public static void main(String args[]) throws InterruptedException {
        int ponder = 0;
        int size = 5;
        ExecutorService exec = Executors.newCachedThreadPool();
        Chopstick[] chopsticks = new Chopstick[size];
        for (int i = 0; i < size ; i++) {
            chopsticks[i] = new Chopstick();
        }

        for (int i = 0; i < size ; i++) {
            //会发生死锁
//            exec.execute(new Philosopher(chopsticks[i], chopsticks[(i +1) % size], i , ponder));
            //死锁的解决方式
            if(i < (size - 1)){
                exec.execute(new Philosopher(chopsticks[i], chopsticks[(i +1) % size], i , ponder));
            }else{
                exec.execute(new Philosopher(chopsticks[0], chopsticks[i], i , ponder));
            }
        }

        //如果发生死锁就回卡住!
        TimeUnit.SECONDS.sleep(30);
        exec.shutdownNow();
    }
}

参考内容
使用线程间通信机制解决问题
Java 中线程间通信机制
阻塞队列

读 《Thinking in Java》有感,遂记之

上一篇下一篇

猜你喜欢

热点阅读