一些收藏Java技术升华面试精选

Java并发编程——LinkedBlockingQueue

2021-12-18  本文已影响0人  小波同学

一、阻塞队列 BlockingQueue

在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

1.1、BlockingQueue的基本原理

先来解释一下阻塞队列:


如上图:

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

查阅BlockingQueue总结了以下阻塞队列的方法:

1、boolean add(E e)

2、boolean offer(E e)

3、void put(E e)

4、boolean offer(E e, long timeout, TimeUnit unit)

5、E take()

6、E poll( long time, timeunit unit)

7、boolean remove()

8、E element()

9、E peek()

注意:
根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。
以上支持阻塞和超时的方法都是能够响应中断的。

1.2、BlockingQueue的实现

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

二、LinkedBlockingQueue

LinkedBlockingQueue也是一个阻塞队列,相比于ArrayBlockingQueue,他的底层是使用链表(单向链表)实现的,而且是一个可有界可无界的队列,在生产和消费的时候使用了两把锁,提高并发,是一个高效的阻塞队列。

LinkedBlockingQueue底层的数据结构是链表,这一点很容易验证,在源码中,我们可以看到它有一个内部类Node,基本源码如下所示:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
    //链表节点定义    
    static class Node<E> {
        //节点中存放的值
        E item;

        //下一个节点
        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }
}   

从上面的注释可以知道,当某个node节点的next节点为null的时候,说明当前节点是最后一个节点。

LinkedBlockingQueue的基本成员属性如下代码所示:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 队列容量,最大为Integer.MAX_VALUE */
    private final int capacity;

    /** 队列长度 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 头结点
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * 尾结点
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** 移除操作的锁,take/poll方法用到 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 移除操作需要等待的条件notEmpty,与takeLock绑定 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 入队操作的锁,put/offer方法用到 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 入队操作需要等待的条件notFull,与putLock绑定 */
    private final Condition notFull = putLock.newCondition();
}   

可以看到,LinkedBlockingQueue内部是用单向链表实现的,并且它有两把锁:takeLock和putLock,以及对应的两个等待条件:notEmpty和notFull。takeLock控制同一时刻只有一个线程从队列头部获取/移除元素,putLock控制同一时刻只有一个线程在队列尾部添加元素。

2.1、构造函数

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        
    public LinkedBlockingQueue() {
        // 调用有参构造函数,初始化容量capacity为int最大值
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        // 容量不能小于0,注意也不能等于0,这点与常规的集合不同
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // 初始化头结点和尾结点为哑节点
        last = head = new Node<E>(null);
    }
    
    // 将已有的集合全部入队
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        // 获取到putLock锁
        final ReentrantLock putLock = this.putLock;
        // 加锁,保证线程安全
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                // 节点内的值不能为null
                if (e == null)
                    throw new NullPointerException();
                // 判断队列是否满了 
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                // 将Node节点添加到队列的尾部,last = last.next = new Node<E>(e);   
                enqueue(new Node<E>(e));
                ++n;
            }
            // 原子类设置Node节点个数,线程安全
            count.set(n);
        } finally {
            // 解锁
            putLock.unlock();
        }
    }   
}   

2.2、阻塞入队

LinkedBlockingQueue提供的入队的方法有多个,包括add、offer、put。

2.2.1、add(E e)方法

其中add(E e)调用的就是offer(E e),offer方法入队成功返回true,入队失败(队列已满或者阻塞超时)会返回false,那么add方法调用offer方法返回false的话,那么就抛出异常,代码如下:

public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {
    
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
}

2.2.2、offer(E e)方法

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public boolean offer(E e) {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        // 获取队列元素个数
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            //如果已经满了,直接返回失败
            return false;
        // 预先设置c为 -1,约定负数为入队失败  
        int c = -1;
        Node<E> node = new Node<E>(e);
        // 获取入队锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //双重判断
            if (count.get() < capacity) {
                //加入链表
                enqueue(node);
                c = count.getAndIncrement();
                // 队列元素个数+1,此时c为元素入队前的个数,也就是比当前队列元素个数少1
                if (c + 1 < capacity)
                    //唤醒生产者线程,继续插入
                    // 如果添加数据后还队列还没有满,
                    //则继续调用notFull的signal方法唤醒其他等待在入队的线程,继续插入
                    notFull.signal();
            }
        } finally {
            // 释放锁
            putLock.unlock();
        }
        if (c == 0)
            //说明里面有一个元素,唤醒消费者
            signalNotEmpty();
        return c >= 0;
    }
}   

2.2.3、offer(E e, long timeout, TimeUnit unit)方法

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        // 预先设置c为 -1,约定负数为入队失败
        int c = -1;
        // 获取入队锁
        final ReentrantLock putLock = this.putLock;
        // 获取队列元素个数
        final AtomicInteger count = this.count;
        // 加锁
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                // 如果超时时间过了队列仍然是满的话就直接返回false
                if (nanos <= 0)
                    return false;
                // 否则调用awaitNanos等待,超时会返回<= 0L  
                nanos = notFull.awaitNanos(nanos);
            }
            // 如果上述没有阻塞,也就是队列没有满,那么这里直接入队
            enqueue(new Node<E>(e));
            // 队列元素个数+1,此时c为元素入队前的个数,也就是比当前队列元素个数少1
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                // 如果添加数据后还队列还没有满,
                //则继续调用notFull的signal方法唤醒其他等待在入队的线程
                notFull.signal();
        } finally {
            // 释放锁
            putLock.unlock();
        }
        // c==0说明队列中有一个元素了,那么就需要唤醒其他正在等待出队的线程
        // 这一点可能不好理解,c = count.getAndIncrement();理解了就差不多
        if (c == 0)
            signalNotEmpty();
        return true;
    }
}   

我们一起总结一下上述的入队源码:

2.2.4、put(E e)方法

对于put方法,它也是入队的一个方法,这个方法和offer方法原理几乎一致,最大的区别在于put方法没有阻塞超时时间,如果队列满了,那么执行put方法的线程将一直阻塞下去。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public void put(E e) throws InterruptedException {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        // 预先设置c为 -1,约定负数为入队失败
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        // 使用AtomicInteger保证原子性
        final AtomicInteger count = this.count;
        // 获取put锁
        putLock.lockInterruptibly();
        try {
            // 如果队列满了,则进入put条件队列等待
            while (count.get() == capacity) {
                notFull.await();
            }
            // 队列不满,或者被取数线程唤醒了,那么会继续执行
            // 这里会往阻塞队列末尾添加一个数据
            enqueue(node);
            c = count.getAndIncrement();
            // 如果队列不满,则唤醒等待时间最长的put线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // 释放put锁
            putLock.unlock();
        }
        // 如果队列为空,再次获取put锁,然后唤醒等待时间最长的put线程
        if (c == 0)
            signalNotEmpty();
    }
    
    //直接放到链表的尾部
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }   
}   

2.3、阻塞出队

2.3.1、remove()方法

public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {
    
    public E remove() 
        // 调用poll()方法出队
        E x = poll();
        if (x != null)
            // 如果有元素出队就返回这个元素
            return x;
        else
            // 如果没有元素出队就抛出异常
            throw new NoSuchElementException();
    }
}

2.3.2、poll()方法

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        
    public E poll() {
        final AtomicInteger count = this.count;
        //如果队列为空,直接返回空
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        // 获取take锁
        final ReentrantLock takeLock = this.takeLock;
        // 上锁
        takeLock.lock();
        try {
            // 如果队列不空
            if (count.get() > 0) {
                //调用dequeue获取队列中的数据
                x = dequeue();
                // 阻塞队列数量减1
                c = count.getAndDecrement();
                // 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程
                if (c > 1)
                    // 释放take锁
                    notEmpty.signal();
            }
        } finally {
            // 解锁
            takeLock.unlock();
        }
        // 如果c == capacity就是说队列中有一个空位,唤醒入队线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
}   

2.3.3、poll(long timeout, TimeUnit unit)方法

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        // 获取take锁
        final ReentrantLock takeLock = this.takeLock;
        // 上锁
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                // 如果队列空了,则进入take条件队列等待
                // 且如果阻塞时间过期,那么将返回null
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            // 在超时时间内返回,则调用dequeue获取队列中的数据
            x = dequeue();
            // 阻塞队列数量减1
            c = count.getAndDecrement();
            // 如果c > 1,说明队列中还有节点元素,那么继续唤醒其他出队线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            // 解锁
            takeLock.unlock();
        }
        // 如果c == capacity就是说队列中有一个空位,唤醒入队线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
}   

2.3.4、take()方法

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        // 获取take锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 如果队列空了,则进入take条件队列等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 获取到第一个节点,非哑节点
            x = dequeue();
            // 阻塞队列数量减1
            c = count.getAndDecrement();
            // 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            // 释放take锁
            takeLock.unlock();
        }
        // 如果队列满了,再次获取take锁,然后唤醒等待时间最长的take线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    //通过这个方法可以看出,链表的首节点的值是null,每次获取元素的时候
    //先把首节点干掉,然后从第二个节点获取值
    private E dequeue() {
        Node<E> h = head;
        // 获取第一个元素结点first
        Node<E> first = h.next;
        // 将头结点自引用,并被垃圾回收掉
        h.next = h; // help GC
        // 将头结点指向第一个元素结点first
        head = first;
        // 获取第一个元素结点的值
        E x = first.item;
        // 将第一个元素结点的值置为null,成为新的哑节点
        first.item = null;
        // 返回被移除的节点元素值
        return x;
    }   
}   

take和put操作如下图所示:


三、ArrayBlockingQueue与LinkedBlockingQueue对比

队列 是否阻塞 是否有界 线程安全 适用场景
ArrayBlockingQueue 一把ReentrantLock锁 生产消费模型,平衡处理速度
LinkedBlockingQueue 可配置 两把ReentrantLock锁 生产消费模型,平衡处理速度

3.1、ArrayBlockingQueue

3.2、LinkedBlockingQueue:

参考:
https://www.itzhai.com/articles/graphical-blocking-queue.html

https://segmentfault.com/a/1190000039174436

https://cloud.tencent.com/developer/article/1609320

上一篇 下一篇

猜你喜欢

热点阅读