ArrayBlockingQueue实现原理

2020-06-07  本文已影响0人  sandy_cheng

概述

ArrayBlockingQueue是常用的的FIFO阻塞队列,实现了BlockingQueue接口,是线程安全的。内部主要通过数组(Array)、锁(ReentrantLock)实现。
注:此队列中不能存放null元素。

常用方法

数据写入

boolean add(E e)

插入成功返回true,失败时抛出异常,add方法调用了父类AbstractQueue的add方法:

public boolean add(E e){
   return super.add(e);
}

AbstractQueue的add方法:

public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

可见,add方法最终调用了本类的offer方法,当offer失败时,抛出"Queue full"异常,成功时返回true。

boolean offer(E e)

offer方法不会阻塞写入,写入成功返回true,当队列已满时,写入数据失败返回false,在写入过程中加锁,在使用offer写入数据是线程安全的。

public boolean offer(E e) {
        //非空验证,如果为空时抛出空指针异常
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //验证写入数量是否等于数组长度,如果相等,不进行写入
            if (count == items.length) 
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

boolean offer(E e, long time, timeunit unit)

方法加入了超时等待功能,在设定时间内会进行自旋等待,在设定时间内如果没有写入成功,返回false,写入成功返回true,此方法会响应中断:

 public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        //非空验证,如果为空,会抛出异常  
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
         //可中断锁,响应中断 
        lock.lockInterruptibly();
        try {
            //自旋等待
            while (count == items.length) {
                //超时,返回false写入失败
                if (nanos <= 0)
                    return false;
                //使用condition进行等待,此时线程进行time-waiting状态
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

put(E e)

put方法在添加元素时,如果队列已满,会进入阻塞状态,此方法会响应中断:

public void put(E e) throws InterruptedException {
        //非空验证,如果为空,会抛出异常
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //可响应中断锁
        lock.lockInterruptibly();
        try {
            //自旋
            while (count == items.length)
                //无限等待,直到notFull.signal或signalAll方法唤醒
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    } 

equeue(E e)

私有方法,equeue是写入底层方法,此方法直接对数组进行操作,将新元素写入到数组中:

    private void enqueue(E x) {
        //this.items即为存储元素的数组
        final Object[] items = this.items;
        //将元素按照顺序写入到数组的指定位置,并更新数组下标(加1)
        items[putIndex] = x;
        //++putIndex将putIndex加1并返回加1后的结果
        if (++putIndex == items.length)
            //当下标更新后达到数长度是,从数组下标为0开始写入,循环往复
            putIndex = 0;
        count++;
        //写入成功后,通知消费线程可继续进行消费
        notEmpty.signal();
    }

数据读取

E take()

从阻塞队列中读取一个元素,如果队列中无元素,将进入阻塞状态,如果超时返回null,此方法可响应中断:

//此方法与put方法类似,dequeue方法是从数组中顺序取出一个元素
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            //此时线程状态是waiting
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

E poll(long timeout, TimeUnit unit)

在设定时间内取出元素,如果超时,返回为空:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    //可响应中断锁
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            //超时,则返回为空
            if (nanos <= 0)
                return null;
            //进入超时等待
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

E peek()

获取队列头部元素,此方法不会阻塞,如果队列中无元素,返回为空,此方法不会移除队列中的元素:

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty    
    } finally {
        lock.unlock();
    }
}

E element()

此方法是AbstractQueue的的方法,ArrayBlockingQueue没有对其重写,element方法调用了子类的peek方法,所以不会删除队列中的元素,当队列为空时,抛出NoSuchElementException:

public E element() {
    //调用子类的peek方法
    E x = peek();
    if (x != null)
        return x;
    else        
      throw new NoSuchElementException();
}

boolean remove(Object o)

删除一个元素,如果要删除的元素为null或者不存在,返回false,否则返回true,此方法不会阻塞线程:

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    //不可中断锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            //循环比较元素并删除
            do {
                if (o.equals(items[i])) {
                    //删除指定元素,并重新对数组元素位置进行调整保证FIFO
                    removeAt(i);
                    //删除成功返回true
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

其它方法

boolean contains(Object o)

判断是否包含一个元素,是返回true,否返回false,此方法需要遍历队列中的元素,并比较,性能较低:

    public boolean contains(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equals(items[i]))
                        return true;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

int drainTo(Collection<? super E> c)

一次性获取队列中所有元素,内部调用了int drainTo(Collection<? super E> c, int maxElements)方法,maxElements参数为Integer.MAX_VALUE,优点是一次获取队列中所有元素,此操作会移除队列中锁获取的元素,避免了多次加锁造成的性能开销

public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

int drainTo(Collection<? super E> c, int maxElements)

从头部获取指定数量的元素,获取的元素将会被从队列中移除。

上一篇下一篇

猜你喜欢

热点阅读