Java

LinkedBlockingQueque初识

2018-05-31  本文已影响56人  jeffrey要努力

LinkedBlockingQueque是个线程安全的队列,先进先出,插入的时候从尾部添加,获取的时候从头部获取,里面维护了一个单项链表

然而我看了下他的出队操作
1 拿到头(head)
2 拿到第二个节点(first)
3 设置头的下一节点是自己,帮助GC。(对GC不太熟,猜测是他不持有别的的对象,别的对象也不持有它的引用了。这样等到GC的时候就可以标记清除)
4 第二个节点设置为头
5 取出第二个节点的item(就是我们添加的对象),在后面返回
6 把第二个节点的item设置为空

说好的出队是头部出队呢,这明明是第二个出队。

/**
     * Removes a node from head of queue.
     *
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

然后带着疑问看看Node的前世今生。
Node的定义


/**
   * Linked list node class.
   */
  static class Node<E> {
      E item;

      /**
       * One of:
       * - the real successor Node
       * - this Node, meaning the successor is head.next
       * - null, meaning there is no successor (this is the last node)
       */
      Node<E> next;

      Node(E x) { item = x; }
  }

一个Node就是单向链表的一个节点,item是该节点的内容,next指向下一个节点,而队列是对尾部进行插入操作,对头部进行读操作,所以只需要记录头和尾就可以了。LinkedBlockingQueque里面:

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
 //链表的头,不变的量:head.item 永远是null
transient Node<E> head;

/**
 * Tail of linked list.
 * Invariant: last.next == null
 */

//链表的尾巴,不变的量:last.next 永远是null
private transient Node<E> last;

这里有个transient关键字,transient的意思是LinkedBlockingQueque实现了Serilizable接口,但是Head这个属性不会序列化。

有个疑问:这样的话序列化之后再反序列化 head的值就没了。数据咋读呢?

上面的代码注释直接就说了 head.item 永远是null
看下head和last的创建、插入操作

创建:在构造函数中初始化了head last ,head就是last且都是空item

public LinkedBlockingQueue(int capacity) {
       if (capacity <= 0) throw new IllegalArgumentException();
       this.capacity = capacity;
       last = head = new Node<E>(null);
   }

插入:在LinkedBlockingQueue的put方法中调用了enqueue

public void put(E e) throws InterruptedException {
  .....
  Node<E> node = new Node<E>(e);
  .....
  enqueue(node);
  .....
}

private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
}

添加的时候就是把尾部的next指向最新的node,然后移动last到最后。全程没有head的事
last = last.next = node 拆开来是

last.next = node  
last = last.next  

但是,当链表是空的时候是last=head 插入第一个node时

last.next = node  =>    head.next = node  (node这个节点就是第一个插入值)
last = last.next  =>    last = head.next  

第一次插入完成后head和last分道扬镳,从这里就可以看出,head其实是起了个标志的作用,head的next才是第一个值所在的节点,所以dequeue才会读取head的后一个节点

阻塞

blocking是阻塞的意思,看take(从头部获取) 和 put(插入尾部)可以了解下大概

  /** Lock held by take, poll, etc */
  private final ReentrantLock takeLock = new ReentrantLock();

  /** Wait queue for waiting takes */
  private final Condition notEmpty = takeLock.newCondition();


  public E take() throws InterruptedException {
          E x;
          int c = -1;
          final AtomicInteger count = this.count;
          final ReentrantLock takeLock = this.takeLock;
          takeLock.lockInterruptibly();
          try {
              while (count.get() == 0) {
                //为0等待新的put的信号
                  notEmpty.await();
              }
              x = dequeue();
              c = count.getAndDecrement();
              if (c > 1)
                  notEmpty.signal();
          } finally {
              takeLock.unlock();
          }
          if (c == capacity)
              signalNotFull();
          return x;
      }



    public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }    

通过takeLock这个重入锁来实现阻塞等待
AtomicInteger是一个提供原子操作的integer对象,count是存储对象的总数。
现在我们假设目前存储的对象为空,最大的容量为Integer.MAX_VALUE
get()的时候count是为0的,那么就等待notEmpty这个condition的信号,get所在的线程挂起等待
再看put的简化版本就一目了然了,如果e插入成功,count就增加一位,那么就通知下take锁的notEmpty这个condition。然后get线程就可以等待结束,继续运行读取对象。这样就实现了读的阻塞等待。

    public void put(E e) throws InterruptedException {
        ....
        int c = -1;
        ....
            c = count.getAndIncrement();
        ....
        if (c == 0)
            signalNotEmpty();
    }  

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

写的阻塞等待也是一样,有个叫putLock和他的一个叫notFull的condition来控制。满了就通过notFull挂起等待读取操作,有线程执行get后通知notFull继续运行

上一篇 下一篇

猜你喜欢

热点阅读