JavaJUC 并发专题并发

Java并发编程——ArrayBlockingQueue

2021-12-18  本文已影响0人  小波同学

一、阻塞队列 BlockingQueue

在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

1.1、BlockingQueue的基本原理

先来解释一下阻塞队列:


如上图:

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

查阅BlockingQueue总结了以下阻塞队列的方法:

1、boolean add(E e)

2、boolean offer(E e)

3、void put(E e)

4、boolean offer(E e, long timeout, TimeUnit unit)

5、E take()

6、E poll( long time, timeunit unit)

7、boolean remove()

8、E element()

9、E peek()

注意:
根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。
以上支持阻塞和超时的方法都是能够响应中断的。

1.2、BlockingQueue的实现

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

二、ArrayBlockingQueue

ArrayBlockingQueue使用的数据结构是数组

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 队列中元素保存的地方 */
    final Object[] items;

    /** 取元素的指针 记录下一次操作的位置 */
    int takeIndex;

    /** 放元素的指针 记录下一次操作的位置 */
    int putIndex;

    /** 元素数量 */
    int count;

    /** 保证并发访问的锁 */
    final ReentrantLock lock;

    /** 等待出队的条件 消费者监视器 */
    private final Condition notEmpty;

    /** 等待入队的条件 生产者监视器 */
    private final Condition notFull;
}   

构造函数

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    //必须传入容量,可以控制重入锁是公平还是非公平
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        // 初始化数组    
        this.items = new Object[capacity];
        // 创建重入锁及两个条件
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
        //final修饰的变量不会发生指令重排
        final ReentrantLock lock = this.lock;
        lock.lock(); // 保证可见性 不是为了互斥  防止指令重排 保证item的安全
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }   
}   

2.1、入队

2.1.1、add(E e)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public boolean add(E e) {
        return super.add(e);
    }
}   

public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {
    
    // AbstractQueue 调用offer(e)如果成功返回true,如果失败抛出异常  
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
}

2.1.2、offer(E e)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public boolean offer(E e) {
        // 元素不可为空
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            if (count == items.length)
                // 如果数组满了就返回false  
                return false;
            else {
                // 如果数组没满就调用入队方法并返回true
                enqueue(e);
                return true;
            }
        } finally {
            //释放锁
            lock.unlock();
        }
    }
}   

2.1.3、put(E e) 方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        // 获取ReentrantLock锁
        final ReentrantLock lock = this.lock;
        // 加锁,如果线程中断了抛出异常
        lock.lockInterruptibly();
        try {
            // 如果队列满了,则进入条件队列进行等待
            while (count == items.length)
                notFull.await();
            // 队列不满,或者被取数线程唤醒了,那么会继续执行
            // 这里会往阻塞队列添加一个数据,然后唤醒等待时间最长的取数线程   
            enqueue(e);
        } finally {
            // 释放ReentrantLock锁
            lock.unlock();
        }
    }
}   

2.1.4、offer(E e, long timeout, TimeUnit unit)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                // 如果数组满了,就阻塞nanos纳秒,如果唤醒这个线程时依然没有空间且时间到了就返回false   
                nanos = notFull.awaitNanos(nanos);
            }
            //入队
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        // 把元素直接放在放指针的位置上
        items[putIndex] = x;
        // 如果放指针到数组尽头了,就返回头部
        if (++putIndex == items.length)
            putIndex = 0;
        // 数量加1 
        count++;
        // 唤醒notEmpty,因为入队了一个元素,所以肯定不为空了
        notEmpty.signal();
    }   
}   

2.2、出队

2.2.1、 remove()方法

public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {
    
    public E remove() 
        // 调用poll()方法出队
        E x = poll();
        if (x != null)
            // 如果有元素出队就返回这个元素
            return x;
        else
            // 如果没有元素出队就抛出异常
            throw new NoSuchElementException();
    }
}

2.2.2、 poll()方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //如果队列里没有数据就直接返回null
            //否则从队列头部出队
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
}

2.2.3、 poll(long timeout, TimeUnit unit)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 如果队列无元素,则阻塞等待nanos纳秒
            // 如果下一次这个线程获得了锁但队列依然无元素且已超时就返回null
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
}

2.2.4、 take()方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //加锁,如果线程中断了抛出异常
        lock.lockInterruptibly();
        try {
            //队列中不存元素
            while (count == 0)
                /*
                 * 一直等待条件notEmpty,即被其他线程唤醒
                 * (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal()
                 * 唤醒其他等待这个条件的线程,同时队列也不空了)
                 */
                notEmpty.await();
            //否则出队  
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // 取取指针位置的元素
        E x = (E) items[takeIndex];
        // 把取指针位置设为null
        items[takeIndex] = null;
        // 取指针前移,如果数组到头了就返回数组前端循环利用
        if (++takeIndex == items.length)
            takeIndex = 0;
        // 元素数量减1   
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 唤醒notFull条件  
        notFull.signal();
        return x;
    }   
}   

如下图,以put和take方法为例:


这里put和take使用了同一个ReentrantLock,不能并发执行。

2.3、缺点

参考:
https://www.itzhai.com/articles/graphical-blocking-queue.html

https://zhuanlan.zhihu.com/p/224946304

上一篇下一篇

猜你喜欢

热点阅读