程序员java架构经验分享

万字长文!阿里面试官总结出来的线程通信笔记(建议收藏)

2020-10-19  本文已影响0人  前程有光

关于线程通信

在客户端与服务端的通信过程中,一般情况下有两种通信模型,它们分别是同步阻塞模型异步非阻塞模型

对于同步阻塞模型,当请求的事件出于某种原因没有返回结果时,此时线程将一直处于阻塞状态而不进行其他操作,例如我们去商场买电脑,我们需要等待卖家对我们的电脑做一系列的售后工作,此时我们并不能去做其他事情,只能等待卖家完成这些工作,这种模式就是同步阻塞模型。

而对于异步非阻塞模型,当客户端请求事件时,服务器会先返回其事件所工作的线程工号,此时客户端可以继续进行其他操作,等待该事件处理完成,并通过工号查询所请求的事件结果,例如我们去京东上买电脑的话,我们下完单后就可以继续浏览其他页面或者做我们其他的事情,只需要等待电脑到后签收即可,不需要等待卖家的售后行为,这种模式就是异步非阻塞模型。

那么在异步非阻塞模型下,我们如何知道当前线程已经处理事件完成了呢?除了使用工号查询外,我们可以引入线程通信来保证各个线程之间可以相互知道事件处理的情况。

线程与线程之间不是相互独立的个体,它们彼此之间需要相互通信和协作。比如说最经典的 生产者-消费者模型:当队列满时,生产者需要等待队列有空间才能继续往里面放入商品,而在等待的期间内,生产者必须释放对临界资源(即队列)的占用权。因为生产者如果不释放对临界资源的占用权,那么消费者就无法消费队列中的商品,就不会让队列有空间,那么生产者就会一直无限等待下去。因此,一般情况下,当队列满时,会让生产者交出对临界资源的占用权,并进入挂起状态。然后等待消费者消费了商品,然后消费者通知生产者队列有空间了。同样地,当队列空时,消费者也必须等待,等待生产者通知它队列中有商品了。这种互相通信的过程就是线程间的协作。

线程通信常见的两种方式

syncrhoized加锁的线程的Object类的wait()/notify()/notifyAll()

wait()、notify()和notifyAll()是Object类中的方法:

/**
 * Wakes up a single thread that is waiting on this object's
 * monitor. If any threads are waiting on this object, one of them
 * is chosen to be awakened. The choice is arbitrary and occurs at
 * the discretion of the implementation. A thread waits on an object's
 * monitor by calling one of the wait methods
 */
public final native void notify();

/**
 * Wakes up all threads that are waiting on this object's monitor. A
 * thread waits on an object's monitor by calling one of the
 * wait methods.
 */
public final native void notifyAll();

/**
 * Causes the current thread to wait until either another thread invokes the
 * {@link java.lang.Object#notify()} method or the
 * {@link java.lang.Object#notifyAll()} method for this object, or a
 * specified amount of time has elapsed.
 * <p>
 * The current thread must own this object's monitor.
 */
public final native void wait(long timeout) throws InterruptedException;

从这三个方法的文字描述可以知道以下几点信息:

有朋友可能会有疑问:为何这三个不是Thread类声明中的方法,而是Object类中声明的方法(当然由于Thread类继承了Object类,所以Thread也可以调用者三个方法)?其实这个问题很简单,由于每个对象都拥有monitor(即锁),所以<mark style="box-sizing: border-box; outline: 0px; background-color: rgb(248, 248, 64); color: rgb(0, 0, 0); overflow-wrap: break-word;">让当前线程等待某个对象的锁,当然应该通过这个对象来操作了</mark>。而不是用当前线程来操作,因为当前线程可能会等待多个线程的锁,如果通过线程来操作,就非常复杂了。

方法调用过程中存在的问题

上面已经提到,如果调用某个对象的wait()方法,当前线程必须拥有这个对象的monitor(即锁),因此调用wait()方法必须在同步块或者同步方法中进行(synchronized块或者synchronized方法)。如果当前线程没有这个对象的锁就调用wait()方法,则会抛出IllegalMonitorStateException

public class TestDemo {
    public static void main(String[] args) {
        String lock  = new String("test");
        try {
            lock.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /**
         * 问题:IllegalMonitorStateException,没有用到监视器锁,所以会出现
         * 异常
         */
    }
}

Exception in thread "main" java.lang.IllegalMonitorStateException

调用某个对象的wait()方法,相当于让当前线程交出(释放)此对象的monitor,然后进入等待状态,等待后续再次获得此对象的锁(Thread类中的sleep方法使当前线程暂停执行一段时间,从而让其他线程有机会继续执行,但它并不释放对象锁)。

public class TestDemo {
    public static void main(String[] args) {
        String lock  = new String("test");
        synchronized (lock){
            try {
                lock.wait();
                System.out.println("the current running thread is "+Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        /**
         * wait()之后使得当前线程阻塞,wait方法之后的代码是不会执行的,释放当前所拥有的
         * monitor lock
         */
     }
}

要注意一点,一个线程被唤醒不代表立即获取了对象的monitor,只有等调用完notify()或者notifyAll()并退出synchronized块,释放对象锁后,其余线程才可获得锁执行。

public class TestDemo {
    public static void main(String[] args) {
        String lock  = new String("test");
        new Thread("A"){
            @Override
            public void run() {
                synchronized (lock){
                    try {
                        lock.wait();
                        System.out.println("the current running thread is "+Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        new Thread("B"){
            @Override
            public void run() {
                synchronized (lock){
                    System.out.println("开始notify time: "+System.currentTimeMillis());
                    lock.notify();
                    System.out.println("结束notify time: "+System.currentTimeMillis());
                }
            }
        }.start();

        /**
         * notify()方法执行后并不会立即释放锁
         */
     }
}

开始notify time: 1602999818924
结束notify time: 1602999818924
the current running thread is A

public class TestDemo {
    public static void main(String[] args) {
        String lock  = new String("test");
        Thread thread = new Thread("A"){
            @Override
            public void run() {
                synchronized (lock){
                    try {
                        lock.wait();
                        System.out.println("the current running thread is "+Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        System.out.println("the thread has been interrupted and the state is "+Thread.currentThread().isInterrupted());
                    }
                }
            }
        };
        thread.start();
        thread.interrupt();

        /**
         * wait是可中断方法,可中断方法会收到中断异常InterruptedException,同时interrupt标识也会被擦除
         */
    }
}

the thread has been interrupted and the state is false

线程通信中的相关概念

notify()和notifyAll()的区别

举个简单的例子:假如有三个线程Thread1、Thread2和Thread3都在等待对象objectA的monitor,此时Thread4拥有对象objectA的monitor,当在Thread4中调用objectA.notify()方法之后,Thread1、Thread2和Thread3只有一个能被唤醒。注意,被唤醒不等于立刻就获取了objectA的monitor。假若在Thread4中调用objectA.notifyAll()方法,则Thread1、Thread2和Thread3三个线程都会被唤醒,至于哪个线程接下来能够获取到objectA的monitor就具体依赖于操作系统的调度了。

sleep()和wait()的区别

生产者消费者模型

所谓的生产者消费者模型,是通过一个容器来解决生产者和消费者的强耦合问题。通俗的讲,就是生产者在不断的生产,消费者也在不断的消费,可是消费者消费的产品是生产者生产的,这就必然存在一个中间容器,我们可以把这个容器想象成是一个货架,当货架空的时候,生产者要生产产品,此时消费者在等待生产者往货架上生产产品,而当货架满的时候,消费者可以从货架上拿走商品,生产者此时等待货架的空位,这样不断的循环。那么在这个过程中,生产者和消费者是不直接接触的,所谓的‘货架’其实就是一个阻塞队列,生产者生产的产品不直接给消费者消费,而是仍给阻塞队列,这个阻塞队列就是来解决生产者消费者的强耦合的。就是生产者消费者模型。

阻塞队列:

代码实现:

class BlockingQueue<E> {
    private final LinkedList<E> queue = new LinkedList<>();
    private static int max; //表示阻塞队列存储元素的最大个数
    private static final int DEFAULT_MAX_VALUE = 10;

    public BlockingQueue() {
        this(DEFAULT_MAX_VALUE);
    }

    public BlockingQueue(int max) {
        this.max = max;
    }

    //生产数据
    public void put(E value) {
        synchronized (queue) {
            //判断当前队列是否有位置存放新生产的数据
//            if(queue.size() >= max){ //单生产者
            while (queue.size() >= max) { //多生产者
                System.out.println(Thread.currentThread().getName() + " :: queue is full");
                try {
                    //没有位置,当前生产数据的线程需要阻塞
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + ":: the new value " + value + " has been produced");
            queue.addLast(value);
            queue.notifyAll(); //期望唤醒消费者线程 (wait/notifyAll不能唤醒指定条件上的线程)
        }
    }

    //消费数据
    public E take() {
        synchronized (queue) {
            //判断当前队列是否存在可消费的数据
//            if (queue.isEmpty()) { //单消费者
            while (queue.isEmpty()) { //多消费者
                System.out.println(Thread.currentThread().getName() + " :: queue is empty");
                try {
                    //不存在,则调用消费数据的线程阻塞
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            E result = queue.removeFirst();
            queue.notifyAll(); //期望唤醒生产者线程 (wait/notifyAll不能唤醒指定条件上的线程)

            System.out.println(Thread.currentThread().getName() + ":: the value " + result + " has been consumed");
            return result;
        }
    }
}

public class ProducerAndConsumerDemo {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new BlockingQueue<Integer>();
        new Thread("Producer") {
            @Override
            public void run() {
                while (true) {
                    queue.put((int) (Math.random() * 1000));
                }
            }
        }.start();

        new Thread("Consumer") {
            @Override
            public void run() {
                while (true) {
                    queue.take();

                    try {
                        TimeUnit.MILLISECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
    }
}

class BlockQueue<E>{
    private LinkedList<E> queue = new LinkedList<>();
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private static final int DEFAULT_MAX_VALUE = 10;
    private static int max; //表示阻塞队列存储元素的最大个数

    public BlockQueue(){
        this(DEFAULT_MAX_VALUE);
    }

    public BlockQueue(int max){
        this.max = max;
    }

    //生产数据
    public void put(E value ){
        lock.lock();
        try {
            //判断当前队列是否有位置存放新生产的数据
//            if(queue.size() >= max){ //单生产者
            while (queue.size() >= max) { //多生产者
                System.out.println(Thread.currentThread().getName() + " :: queue is full");
                //没有位置,当前生产数据的线程需要阻塞
                condition.await();
            }
            queue.addLast(value);
            System.out.println(Thread.currentThread().getName() + ":: the new value " + value + " has been produced");
            condition.signalAll(); //期望唤醒消费者线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

    //消费数据
    public void take(){
        lock.lock();
        try {
            //判断当前队列是否存在可消费的数据
//            if (queue.isEmpty()) { //单消费者
            while (queue.isEmpty()) { //多消费者
                System.out.println(Thread.currentThread().getName() + " :: queue is empty");
                condition.await(); //不存在,则调用消费数据的线程阻塞
            }
            E result = queue.removeFirst();
            System.out.println(Thread.currentThread().getName() + ":: the value " + result + " has been consumed");
            condition.signalAll(); //期望唤醒生产者线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public class ProducerAndConsumerDemo {
    public static void main(String[] args) {
        BlockQueue<Integer> queue = new BlockQueue<Integer>();
        new Thread("Producer") {
            @Override
            public void run() {
                while (true) {
                    queue.put((int) (Math.random() * 1000));
                }
            }
        }.start();

        new Thread("Consumer") {
            @Override
            public void run() {
                while (true) {
                    queue.take();

                    try {
                        TimeUnit.MILLISECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
    }
}

最后

感谢你看到这里,文章有什么不足还请指正,觉得文章对你有帮助的话记得给我点个赞,每天都会分享java相关技术文章或行业资讯,欢迎大家关注和转发文章!

上一篇下一篇

猜你喜欢

热点阅读