Java-解读Java 杂谈

BlockingQueue的存与取【史上最清晰】

2019-08-08  本文已影响9人  可爱猪猪

作者:可爱猪猪 - 帅锅一枚
作者的网名很阔爱,如果喜欢本文章一定要点 喜欢 或者 打赏,拜托~
作者一直在进步,需要你们的支持和鼓励,谢谢!
人生理想:在程序猿界混出点名堂!

1.是什么什么是BlockingQueue

BlockingQueue为阻塞队列,比如线程池的构建就使用了BlockingQueue
比如LinkedBlockingQueue和ArrayBlockingQueue等,还有如下:

BlockingQueue.jpg

Notice:以下讨论以LinkedBlockingQueue为例:

2.储备BlockingQueue的界限

界限为这个队列的最大容量,如果超过界限,会有对应的处理机制。

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
 public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

3.看结果BlockingQueue的存与取,直接看存取方法区别

方法名称 是否有返回 超出capacity处理
offer 返回false
put 阻塞
add 抛出异常

方法名称 读取后是否移除队列 队列为空处理
poll 移除 返回null
take 移除 阻塞
peek 不移除 返回null

4.知其所以然BlockingQueue的存与取,对边界处理机制

:offer()、 put()、 add()

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
       // 超出界限,直接返回失败
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                // 追加到队列
                enqueue(node);
                //队列总数+1
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    // 如果增加了反而没满,通知put方法释放等待
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
          // 如果队列已满,阻塞等待,直到通知解锁,否则就添加成功
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
   }
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

:poll() 、take() 、peek(),peek英文为窥视

 public E poll() {
        final AtomicInteger count = this.count;
       //如果队列为空,返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                //并从队列移除
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
           // 如果队列为空,则阻塞等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
public E peek() {
        // 如果队列为空则返回空
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 并未从队列移除,所以说只是偷窥,并未做移除操作
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
上一篇下一篇

猜你喜欢

热点阅读