Java-解读Java 杂谈

手写BlockingQueue及源码剖析(看了肯定不后悔,不看后

2019-08-17  本文已影响31人  可爱猪猪

作者:可爱猪猪 - 帅锅一枚
作者的网名很阔爱,如果喜欢本文章一定要点 喜欢 或者 打赏,拜托~
作者一直在进步,需要你们的支持和鼓励,谢谢!
人生理想:在程序猿界混出点名堂!
出来混不容易,有钱的出个赏钱,没钱的点个喜欢❥(^_-),实在没钱加个粉丝也可以^_^

今天虽然作者吹着小风扇,热的汗流浃背的,但是还是要跟大家交流一下jdk的BlockingQueue的实现及一些底层原理,也是由于最近在看《JAVA并发编程实战》这一本书的第14章“构建自定义的同步工具”,推荐阅读此书,在JDK中,作者在其他文章中也提到有ArrayBlockingQueue和LinkedBlockingQueue等,今天主要从两个角度跟大家交流下代码和书中的俩个精髓点,好了还是废话不多说,坚持读下去,以下内容可是作者读了一个星期的书领悟出来的,先看作者写的模拟代码:

/**
 * 
 * @author 可爱猪猪
 * @param <T>
 */
public class BoundedQueue<T> {
    private int capicity;
    private int count = 0;
    private Object putLock = new Object();
    private Object takeLock = new Object();

    public BoundedQueue(int capicity) {
        this.capicity = capicity;
    }

    public void put(T t) throws InterruptedException {
        synchronized (putLock) {
            if (isFull()) {
                putLock.wait();
            }
            enqueue(t);
        }

        notifyTakeLock();
    }

    /**
     * 队列中获取元素
     * @throws InterruptedException
     */
    public T take() throws InterruptedException {
        T t;
        synchronized (takeLock) {
            while (isEmpty()) {
                takeLock.wait();
            }
            t = dequeue();
        }
        notifyPutLock();
        return t;
    }

    /**
     * 唤醒putLock
     */
    private void notifyPutLock() {
        synchronized (putLock) {
            putLock.notifyAll();
        }
    }

    /**
     * 唤醒takeLock
     */
    private void notifyTakeLock() {
        synchronized (takeLock) {
            takeLock.notifyAll();
        }
    }

    /**
     * 入列
     */
    private void enqueue(T t) {
        // 省略入列逻辑
        // ...

        // 计算队列的大小
        count++;
    }

    /**
     * 出列
     */
    public T dequeue() {
        // 省略出列逻辑
        // ...

        // 计算队列的大小

        count--;
        return null;
    }

    /**
     * 判断队列是否已满
     * @return
     */
    public boolean isFull() {
        return count == capicity;
    }

    /**
     * 判断队列是否为空
     * @return
     */
    public boolean isEmpty() {
        return count == 0;
    }

    public static void main(String[] args) throws InterruptedException {
        // 初始化capicity=2的有界队列
        final BoundedQueue<String> q = new BoundedQueue<String>(2);
        
        // 把队列撑满
        q.put(null);
        q.put(null);

        // 已满,两个线程,让put方法等待
        new Thread(() -> {
            try {
                q.put(null);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                q.put(null);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 唤醒put线程
        new Thread(() -> {
            try {
                q.take();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }).start();

    }

}

如果有BUG,一定评论告诉我哦。
作者总是思路很清晰的按套路出牌,好了,开始我的表演了:

这段代码是什么意思,main中的逻辑想证明什么

一个有界队列、支持put(队列已满不允许put)、支持take(队列已为空不允许take)

书中给出条件等待的标准形式,如下,为什么要进行while循环呢?改成if有什么问题吗?作者带着疑问写了一开始的代码:

// 必须通过一个锁来保护条件谓词
synchronized(lock){
  while(!conditionPredicat()){
          lock.wait();
  }
}

JDK实现也按照这个标准形式实现,如下:

 public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
       // 开始加锁
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
           // 循环等待
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // finally中释放锁
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

进入解答“为什么是while不是if?”的环节?

先看下作者在main中的准备的测试用例,逻辑如下图:

image.png
图上作者已经标明while和if的执行逻辑,仔细琢磨6和7两步,作者的例子很具有代表性

总结,作者开篇提到收获到两点

synchronized(waitLock){
           waitLock.wait();
          //  notifyTakeLock() ; 等于以下代码
          synchronized(takeLock){
                 takeLock.notifyAll();
          }
}

而是要这样写

synchronized(waitLock){
           waitLock.wait();
          //  notifyTakeLock() ; 等于以下代码
}

 synchronized(takeLock){
               takeLock.notifyAll();
 }

看过《JAVA并发编程实战》中其实有一章节“死锁”专门解释过,嵌套锁如果不保持锁的顺序一致性容易造成死锁,写锁的时候一定要注意减小锁的粒度和减少锁的调用及嵌套,避免死锁。

上一篇下一篇

猜你喜欢

热点阅读