线程并发--Condition控制线程通信

2020-04-27  本文已影响0人  Petrel_Huang

问题:wait和notify方法,只能被同步监听锁对象来调用,否则报错IllegalMonitorStateException.
那么现在问题来了,Lock机制根本就没有同步锁了,也就没有自动获取锁和自动释放锁的概念.
因为没有同步锁,所以Lock机制不能调用wait和notify方法.

解决方案:Java5中提供了Lock机制的同时提供了处理Lock机制的通信控制的Condition接口.Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

Condition常用方法:


image.png

注意:这些方法必须是在lock方法和unlock方法中间使用,和synchronized相似。

获取Condition实现类对象

我们都知道Condition是接口,如何获取Condition实现类对象呢?

使用Lock中的newCondition方法获取Condition实现类对象。以ReentrantLock为例,newCondition创建的是AQS中的ConditionObject内部类对象。

1.1.ConditionObject结构

我们通过源代码发现Condition的实现也是通过一个队列完成的,他和我们的Lock中的队列不一样,Lock中的队列叫做同步队列,用于决定那个封装了线程的节点获取到锁资源,Condition中的队列我们成为等待队列,在调用Condition的await()方法之后,线程释放锁,构造成相应的节点进入等待队列等待。

image.png

1.2.await方法源码分析:

        public final void await() throws InterruptedException {
            if (Thread.interrupted())//如果线程中断,抛出异常,所以才会说该方法不会进入等待队列
                throw new InterruptedException();
            Node node = addConditionWaiter();//将线程封装到一个节点中,并把该节点放入到队列的尾部
            int savedState = fullyRelease(node);//释放锁资源,并唤醒下一个节点(线程)
            int interruptMode = 0;
//循环判断node是否在同步队列中,不在同步队列中,拿不到锁,线程不能执行,继续等待
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);//线程挂起
//如果能够继续执行,证明挂起结束,则判断如果已经中断了,则退出循环,不再挂起线程
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //重新竞争同步锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //清理队列中不是非等待状态的节点
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);//其他状态抛出异常或者是线程中断
        }

addConditionWaiter方法分析图:


image.png

同步队列和等待队列关系分析图:


image.png

调用await方法分析图:


image.png
21.3 signal方法源码分析:
        public final void signal() {
            if (!isHeldExclusively())//判断当前是否是获取锁的状态,因为线程通信前提是线程安全
                throw new IllegalMonitorStateException();//不安全报错
            Node first = firstWaiter;//获取队列头
            if (first != null)//头节点不为空
                doSignal(first);//唤醒线程
        }

doSignal方法:

        private void doSignal(Node first) {
            do {
                //如果第一个节点同时是最后一个节点
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;//因为要移除最后一个节点,所以设置为空
                first.nextWaiter = null;//第一个节点的下一个节点也设置为空
            } while (!transferForSignal(first) &&//唤醒挂起的线程
                     (first = firstWaiter) != null);//头节点不为空
        }

transferForSignal方法:

    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//修改节点的等待状态,还原到初始状态
            return false;//修改失败然后false,成功继续往下执行
        Node p = enq(node);//将节点放入到同步队列中
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//唤醒状态修改成功
            LockSupport.unpark(node.thread);//唤醒线程
        return true;
    }

signal方法分析图:


image.png

代码演示:

public class ShareResource {
    private int num = 0;// 信号量

    private Lock lock = new ReentrantLock();
    /**
     * notify和signal有一点不同,notify是唤醒其他线程,
     * 而signal谁调用了await释放的时候需要谁调用才能释放对应的线程,因为lock是对象
     */
    private Condition cset = lock.newCondition(); // 生产者通信
    private Condition cget = lock.newCondition(); // 消费者通信

    // 生产操作
    public void set() {
        lock.lock();
        try {
            while (num >= 10) {// 10为盘子装满了
                System.out.println("盘子装满了,不能再装了!");
                cset.await();
            }
            this.num++;
            System.out.println("生产第" + num + "个包子");
            cget.signalAll();// 生产完唤醒消费者
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    // 消费操作
    public void get() {
        lock.lock();
        try {
            while (num <= 0) {
                System.out.println("包子吃完了,生产了才有得吃!");
                cget.await();
            }
            this.num--;
            System.out.println("消费第" + num + "个包子");
            cset.signalAll();// 消费完了唤醒生产者
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

21.4 【迅雷面试题】
编写一个程序,开启3个线程,这3个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出结果必须按ABC的顺序显示;如:ABCABC….依次递推。
代码演示:

public class ShareResource {
    private int singal = 0;// 0:A线程,1:B线程,2:C线程
    private Lock lock = new ReentrantLock();
    // 多个的时候我们使用singalAll唤醒所有线程,所以使用一个等待队列即可,但是不好,因为容易出现自旋
    private Condition c = lock.newCondition();// 等待队列

    public void getA() {
        lock.lock();
        try {
            while (singal != 0) {
                c.await();
            }
            System.out.println(Thread.currentThread().getName() + "--> A");
            singal++;
            c.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void getB() {
        lock.lock();
        try {
            while (singal != 1) {
                c.await();
            }
            System.out.println(Thread.currentThread().getName() + "--> B");
            singal++;
            c.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void getC() {
        lock.lock();
        try {
            while (singal != 2) {
                c.await();
            }
            System.out.println(Thread.currentThread().getName() + "--> C");
            singal = 0;
            c.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

注意:这里使用singalAll效率没有singal效率高,因为唤醒所有的线程,如果执行a之后抢到锁的是c不是b,那么c要重新回到等待队列中,知道b抢到锁为止,出现自旋的情况,降低性能。
最优方案代码:

public class ShareResource {
    private int singal = 0;// 0:A线程,1:B线程,2:C线程
    private Lock lock = new ReentrantLock();
    private Condition ca = lock.newCondition();// a等待队列
    private Condition cb = lock.newCondition();// b等待队列
    private Condition cc = lock.newCondition();// c等待队列

    public void getA() {
        lock.lock();
        try {
            while (singal != 0) {
                ca.await();
            }
            System.out.println(Thread.currentThread().getName() + "--> A");
            singal++;
            cb.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void getB() {
        lock.lock();
        try {
            while (singal != 1) {
                cb.await();
            }
            System.out.println(Thread.currentThread().getName() + "--> B");
            singal++;
            cc.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void getC() {
        lock.lock();
        try {
            while (singal != 2) {
                cc.await();
            }
            System.out.println(Thread.currentThread().getName() + "--> C");
            singal = 0;
            ca.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读