LinkedBlockingQueue

2017-05-24  本文已影响0人  风吹过山

一、LinkedBlockingQueue是什么?
LinkedBlockingQueue是一个线程安全的阻塞队列,具有先进先出等特性。主要实现BlockingQueue接口,BlockingQueue接口继承自java.util.Queue接口,并在这个接口的基础上增加了take和put方法。

二、线程安全?
在LinkedBlockingQueue的所有共享的全局变量 中,final声明的capacity在构造器生成实例时就成了不可变量了。而final声明的count由于是AtomicInteger类型的,所以能 够保证其操作的原子性。剩下的final的变量都是初始化成了不可变量,并且不包含可变属性,所以都是访问安全的。那么剩下的就是Node类型的head和 last两个可变量。所以要保证LinkedBlockingQueue是线程安全的就是要保证对head和last的访问是线程安全的 ,其中LinkedBloakingQueue是使用读和写两把锁来控制并发操作的。 如下:

    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

三、链表的入队和出队
LInkedBlockingQueue中的链表,包含头指针和尾指针,其中:
头指针用来管理元素出队,和 take(), poll(), peek() 三个操作关联
尾指针用来管理元素入队,和 put(), offer() 两个操作关联
入队:
put()算法,为阻塞算法,直到队列有空余时,才能为队列加入新元素
offer()算法为非阻塞算法,如果队列已满,立即返回或等待一会再返回,通过返回值ture或false,标记本次入队操作是否成功;
出队:
take()算法为阻塞算法,直到队列有非空时,才将允许调用线程取出数据
poll()算法为非阻塞算法,如果队列为空,立即返回或等待一会再返回,通过返回值ture或false,标记本次出队操作是否成功
peek()算法比较特殊,只返回队列中的第一个元素,既不出队,也不阻塞,如果没有元素,就返回null

四、take和put

    //出队
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//随时保证响应中断 
        try {
            while (count.get() == 0) {
                notEmpty.await();//队列为null时,,此时线程将处于阻塞状态,直到有新的元素
            }
            x = dequeue();//取出元素
            c = count.getAndDecrement(); // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
            if (c > 1)
                notEmpty.signal();//唤醒其他正在等待出队列的线程
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

  //入队
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();//随时保证响应中断 
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();//队列满时,,此时线程将处于阻塞状态,直到队列有空闲的位置才继续执行
            }
            enqueue(node);// //让元素进入队列的末尾
            c = count.getAndIncrement();//c是旧值,为了下面的c==0的判断。意思就是,之前没有元素,现在有了,其他线程赶紧来获取。
            if (c + 1 < capacity)
                notFull.signal();//唤醒其他正在等待入队列的线程
        } finally {
            putLock.unlock();
        }
        if (c == 0)  /*当c=0时,当前队列为空队列,出队列的线程都处于等待状态,
        现在新添加了一个新的元素,即队列不再为空,因此它会唤醒正在等待获取元素的线程。
        */
            signalNotEmpty();
    }

其中enqueue和dequeue
enqueue()的源码如下:将元素E添加到队列的末尾,设置node为新的尾节点!

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

dequeue()的源码如下:删除队列的头节点,并将表头指向“原头节点的下一个节点”。

private E dequeue() {
 // assert takeLock.isHeldByCurrentThread();
 // assert head.item == null; 
  Node<E> h = head; 
  Node<E> first = h.next; 
  h.next = h; // help GC 
  head = first;
   E x = first.item; 
  first.item = null; 
  return x;
}

问题:
补充两个线程去生产和消费的图
Lock的条件变量? c= count.getAndIncrement();?count原子性的理解

上一篇下一篇

猜你喜欢

热点阅读