并发jdkJVM和并发编程

Java并发编程——LinkedBlockingDeque

2021-12-19  本文已影响0人  小波同学

一、阻塞队列 BlockingQueue

在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

1.1、BlockingQueue的基本原理

先来解释一下阻塞队列:


如上图:

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

查阅BlockingQueue总结了以下阻塞队列的方法:

1、boolean add(E e)

2、boolean offer(E e)

3、void put(E e)

4、boolean offer(E e, long timeout, TimeUnit unit)

5、E take()

6、E poll( long time, timeunit unit)

7、boolean remove()

8、E element()

9、E peek()

注意:
根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。
以上支持阻塞和超时的方法都是能够响应中断的。

1.2、BlockingQueue的实现

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

二、LinkedBlockingDeque

注意:LinkedBlockingDeque底层利用ReentrantLock实现同步,并不像ConcurrentLinkedDeque那样采用无锁算法。

如何使用 LinkedBlockingDeque

使用 LinkedBlockingDeque 的风险

2.1、内部结构

LinkedBlockingDeque内部是双链表的结构,结点Node的定义如下:

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /** 双向链表节点 */
    static final class Node<E> {
        /**
         *  节点元素,如果节点已经被移除,则为 null
         */
        E item;

        /**
         * 前驱结点指针.
         */
        Node<E> prev;

        /**
         * 后驱结点指针.
         */
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }
}   

2.2、成员属性

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     * 头结点
     */
    transient Node<E> first;

    /**
     * 尾结点
     */
    transient Node<E> last;

    /** 队列中个数 */
    private transient int count;

    /** 队列长度,可以使用构造注入,如未设定,默认为无界队列 */
    private final int capacity;

    /** 显示锁 */
    final ReentrantLock lock = new ReentrantLock();

    /** 消费队列(队列为空时,无法消费,线程阻塞) */
    private final Condition notEmpty = lock.newCondition();

    /** 生产队列(队列满时,无法入队,线程阻塞) */
    private final Condition notFull = lock.newCondition();
}   

2.3、构造函数

LinkedBlockingDeque一共三种构造器,不指定容量时,默认为Integer.MAX_VALUE:

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     * 默认构造器.
     */ 
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }
    
    /**
     * 指定容量的构造器.
     */ 
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

    /**
     * 从已有集合构造队列.
     */
    public LinkedBlockingDeque(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock lock = this.lock;
        lock.lock(); // Never contended, but necessary for visibility
        try {
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (!linkLast(new Node<E>(e)))
                    throw new IllegalStateException("Deque full");
            }
        } finally {
            lock.unlock();
        }
    }   
}   

2.4、队首入队

初始:


队首插入结点node:


public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  将目标元素 e 添加到队列头部,如果队列已满,则阻塞等待有可用空间后重试
     */ 
    public void putFirst(E e) throws InterruptedException {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            // 尝试在头部添加元素
            while (!linkFirst(node))
                // 当前线程在非满条件上等待
                notFull.await();
        } finally {
            //释放锁
            lock.unlock();
        }
    }
    
    private boolean linkFirst(Node<E> node) {
        // 队列已满,则直接返回 false
        if (count >= capacity)
            return false;
        // 读取头节点    
        Node<E> f = first;
        // 将旧头结点链接到目标节点之后
        node.next = f;
        // 写入新头节点
        first = node;
        // 1)当前元素为第一个添加到队列中的元素
        if (last == null)
            // 写入尾节点
            last = node;
        else
            // 将旧头节点的前置节点设置为新头结点
            f.prev = node;
        // 递增计数 
        ++count;
        // 唤醒在非空条件上阻塞等待的线程来读取元素
        notEmpty.signal();
        return true;
    }   
}   
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  如果队列已满,则直接返回 false,否则将目标元素 e 添加到队列头部
     */ 
    public boolean offerFirst(E e) {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            // 尝试在头部添加元素
            return linkFirst(node);
        } finally {
            //释放锁
            lock.unlock();
        }
    }
}   
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  在指定的超时时间内尝试将目标元素 e 添加到队列头部,成功则返回 true
     */ 
    public boolean offerFirst(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 头结点添加失败
            while (!linkFirst(node)) {
                // 已经超时则直接返回
                if (nanos <= 0)
                    return false;
                // 当前线程在非满条件上阻塞等待,唤醒后再次尝试添加 
                nanos = notFull.awaitNanos(nanos);
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
}   

2.4、队尾入队

初始:


队尾插入结点node:


public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public void put(E e) throws InterruptedException {
        putLast(e);
    }
    
    /**
     *  将目标元素 e 添加到队列尾部,如果队列已满,则阻塞等待有可用空间后重试    
     */ 
    public void putLast(E e) throws InterruptedException {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            // 尝试将节点链接到队列尾部
            while (!linkLast(node))
                // 队列已满,当前线程在非满条件上阻塞等待,被唤醒后再次尝试
                notFull.await();
        } finally {
            //释放锁
            lock.unlock();
        }
    }   
    
    private boolean linkLast(Node<E> node) {
        // 队列已满,则直接返回 false
        if (count >= capacity)
            return false;
        // 读取尾节点    
        Node<E> l = last;
        // 将目标节点链接到尾节点之后
        node.prev = l;
        // 写入尾节点为新增节点
        last = node;
        // 1)当前元素是第一个加入队列的元素
        if (first == null)
            // 写入头结点
            first = node;
        else
            // 将旧尾节点的后置节点更新为新增节点
            l.next = node;
        // 递增总数 
        ++count;
        // 唤醒在非空条件上等待的线程
        notEmpty.signal();
        return true;
    }   
}   
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public boolean offer(E e) {
        return offerLast(e);
    }
    
    /**
     *  如果队列已满,则直接返回 false,否则将目标元素 e 添加到队列尾部
     */ 
    public boolean offerLast(E e) {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            // 尝试将节点链接到队列尾部
            return linkLast(node);
        } finally {
            //释放锁
            lock.unlock();
        }
    }   
}   
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        return offerLast(e, timeout, unit);
    }

    /**
     *  在指定的超时时间内尝试将目标元素 e 添加到队列尾部,成功则返回 true
     */
    public boolean offerLast(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 如果存入的值为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            // 尝试将目标元素 e 添加到队列尾部
            while (!linkLast(node)) {
                // 已经超时则直接返回 false
                if (nanos <= 0)
                    return false;
                // 当前线程在非满条件上阻塞等待,被唤醒后再次尝试  
                nanos = notFull.awaitNanos(nanos);
            }
            return true;
        } finally {
            //释放锁
            lock.unlock();
        }
    }   
}   

2.5、队首出队

初始:


删除队首结点:


public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public E take() throws InterruptedException {
        return takeFirst();
    }
    
    /**
     *  移除并返回头部节点,如果队列为空,则阻塞等待有可用元素之后重试
     */ 
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            // 尝试移除并返回头部节点
            while ( (x = unlinkFirst()) == null)
                // 队列为空,则阻塞等待有可用元素之后重试
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }   
}   
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public E poll() {
        return pollFirst();
    }
    
    /**
     *  如果队列为空,则立即返回 null,否则移除并返回头部元素
     */ 
    public E pollFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }   
    
    /**
     * 从队首删除一个元素, 失败则返回null.
     */ 
    private E unlinkFirst() {    
        // 获取首节点    
        Node<E> f = first;    
        // 首节点为null,则返回null    
        if (f == null)        
            return null;    
        // 获取首节点的后继节点    
        Node<E> n = f.next;    
        // 移除first,将首节点更新为n
        E item = f.item;    
        f.item = null;    
        f.next = f; // help GC    
        first = n;    
        // 移除首节点后,为空队列    
        if (n == null)        
            last = null;    
        else        
            // 将新的首节点的前驱节点设置为null        
            n.prev = null;    
        --count;    
        // 唤醒阻塞在notFull上的线程    
        notFull.signal();    
        return item;
    }
}   
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return pollFirst(timeout, unit);
    }
    
    /**
     *  在指定的超时时间内尝试移除并返回头部元素,如果已经超时,则返回 null
     */ 
    public E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            E x;
            // 尝试移除并返回头部元素
            while ( (x = unlinkFirst()) == null) {
                // 已经超时则返回 null
                if (nanos <= 0)
                    return null;
                // 当前线程在非空条件上阻塞等待,被唤醒后进行重试  
                nanos = notEmpty.awaitNanos(nanos);
            }
            // 移除成功则直接返回头部元素
            return x;
        } finally {
            lock.unlock();
        }
    }   
}   

2.6、队尾出队

初始:


删除队尾结点:


public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  移除并返回尾部节点,如果队列为空,则阻塞等待有可用元素之后重试
     */ 
    public E takeLast() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            // 尝试移除并返回尾部节点
            while ( (x = unlinkLast()) == null)
                // 队列为空,则阻塞等待有可用元素之后重试
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * 从队尾删除一个元素, 失败则返回null.
     */     
    private E unlinkLast() {       
        // 获取尾节点    
        Node<E> l = last;    
        // 尾节点为null,则返回null    
        if (l == null)        
            return null;    
        // 获取尾节点的前驱节点    
        Node<E> p = l.prev;    
        // 移除尾节点,将尾节点更新为p    
        E item = l.item;    
        l.item = null;    
        l.prev = l; // help GC    
        last = p;    
        // 移除尾节点后,为空队列    
        if (p == null)        
            first = null;    
        else        
            // 将新的尾节点的后继节点设置为null        
            p.next = null;    
        --count;    
        // 唤醒阻塞在notFull上的线程    
        notFull.signal();    
        return item;
    }   
}   
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    /**
     *  如果队列为空,则抛出异常,否则移除并返回尾部元素
     */ 
    public E removeLast() {
        E x = pollLast();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    
    /**
     *  如果队列为空,则立即返回 null,否则移除并返回尾部元素
     */ 
    public E pollLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //从队尾删除一个元素, 失败则返回null. 
            return unlinkLast();
        } finally {
            lock.unlock();
        }
    }   
}
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    
    public E pollLast(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            E x;
            // 尝试移除并返回尾部元素
            while ( (x = unlinkLast()) == null) {
                // 已经超时则返回 null
                if (nanos <= 0)
                    return null;
                // 当前线程在非空条件上阻塞等待,被唤醒后进行重试
                nanos = notEmpty.awaitNanos(nanos);
            }
            // 移除成功则直接返回尾部元素
            return x;
        } finally {
            lock.unlock();
        }
    }
}

三、LinkedBlockingQueue与LinkedBlockingDeque对比

LinkedBlockingQueue

LinkedBlockingDeque

参考:
https://www.itzhai.com/articles/graphical-blocking-queue.html

https://www.cnblogs.com/zhuxudong/p/10079511.html

https://segmentfault.com/a/1190000016398508

https://www.cnblogs.com/snake107/p/12035580.html

上一篇 下一篇

猜你喜欢

热点阅读