java并发相关java开发

java阻塞队列 LinkedBlockingQueue

2018-09-10  本文已影响8人  韭菜待收割
java.util.concurrent
//由单向链表结构组成的有界阻塞队列
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable


//内部维护的链表节点结构Node
static class Node<E> { 
  E item;//入队元素 
  Node<E> next;//后继节点 
  Node(E x) { 
    item = x; 
  } 
} 

//入队锁
private final ReentrantLock takeLock = new ReentrantLock();
//出队锁
private final ReentrantLock putLock = new ReentrantLock();

1、常用方法

构造方法

/**
 * 在不指定时容量时为Integer.MAX_VALUE
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

父类AbstractQueue实现

/**
 * 增加一个元索,如果队列已满,则抛出异常
 */
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

/**
 * 移除并返回队列头部的元素,如果队列为空,则抛出异常
 */
public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

/**
 * 返回队列头部的元素,如果队列为空,则抛出异常
 */
public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

非阻塞方法

/**
 * 添加一个元素并返回true,如果队列已满,则返回false
 */
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            //队列数据总数自增+1后返回,返回的是未+1的值
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                //唤醒非满等待队列上的线程
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        //队列中刚好有一个数据,唤醒非空等待队列 
        signalNotEmpty();
    return c >= 0;
}

/**
 * 移除并返问队列头部的元素,如果队列为空,则返回null
 * */
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)//出队后队列还是非空
                //唤醒非空等待队列上的线程
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        //唤醒非满等待队列上的线程 
        signalNotFull();
    return x;
}

/**
 * 返回队列头部的元素,如果队列为空,则返回null
 */
public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //返回head的next的item,因为head和tail的item是Null
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

阻塞方法

/**
 * 添加一个元素,如果队列满,则阻塞
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();//可被线程中断
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)//队列未满
            notFull.signal();//唤醒非满等待队列上的线程
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        //队列上只有一个元素,唤醒非空等待队列上的线程
        signalNotEmpty();
}


/**
 * 移除并返回队列头部的元素,如果队列为空,则阻塞
 */
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();//可被线程中断
    try {
        while (count.get() == 0) {
            notEmpty.await();//休眠非空等待队列上的线程
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();//唤醒非空等待队列上的线程 
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        //唤醒非满等待队列上的线程
        signalNotFull();
    return x;
}

其它

/**
 * 没有遍历整个队列
 */
public int size() {
    return count.get();
}

2、入队出队操作

当队列添加元素后:
*会调用notFull.signal(); 唤醒非满等待队列上的线程;
*如果队列之前刚好是空的,会调用signalNotEmpty(); 唤醒非空等待队列上的线程。

当队列删除元素后:
*如果出队后队列还是非空,会调用notEmpty.signal(); 唤醒非空等待队列上的线程;
*如果队列之前刚好是满的,会调用signalNotFull(); 唤醒非满等待队列上的线程 。

上一篇下一篇

猜你喜欢

热点阅读