程序员

并发容器-Queue

2017-08-23  本文已影响0人  zhaoyunxing

ConcurrentLinkedQueue

特点

方法

   public E poll() {
          restartFromHead:
          for (;;) {
              for (Node<E> h = head, p = h, q;;) {
                  E item = p.item;
  
                  if (item != null && p.casItem(item, null)) {
                    //成功的ca是线性化点
                    //从这个队列中删除项目
                      if (p != h) // hop two nodes at a time
                          updateHead(h, ((q = p.next) != null) ? q : p);
                      return item;
                  }
                  else if ((q = p.next) == null) {
                      updateHead(h, p);
                      return null;
                  }
                  else if (p == q)
                      continue restartFromHead;
                  else
                      p = q;
              }
          }
      }
 public E peek() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

ConcurrentLinkedDeque

特点

ArrayBlockingQueue

特点

       /**
        * Creates an {@code ArrayBlockingQueue} with the given (fixed)
        * capacity and default access policy.
        *
        * @param capacity the capacity of this queue
        * @throws IllegalArgumentException if {@code capacity < 1}
        */
       public ArrayBlockingQueue(int capacity) {
           this(capacity, false);
       }
       
  /**
   * Creates an {@code ArrayBlockingQueue} with the given (fixed)
   * capacity and the specified access policy.
   *
   * @param capacity the capacity of this queue
   * @param fair if {@code true} then queue accesses for threads blocked
   *        on insertion or removal, are processed in FIFO order;
   *        if {@code false} the access order is unspecified.
   * @throws IllegalArgumentException if {@code capacity < 1}
   */
  public ArrayBlockingQueue(int capacity, boolean fair) {
      if (capacity <= 0)
          throw new IllegalArgumentException();
      this.items = new Object[capacity];
      lock = new ReentrantLock(fair); //设置锁的公平
      notEmpty = lock.newCondition();
      notFull =  lock.newCondition();
  }

方法

DelayQueue

特点

TransferQueue

特点

SynchronousQueue

特点

     /**
      * Adds the specified element to this queue, waiting if necessary for
      * another thread to receive it.
      *
      * @throws InterruptedException {@inheritDoc}
      * @throws NullPointerException {@inheritDoc}
      */
     public void put(E e) throws InterruptedException {
         if (e == null) throw new NullPointerException();
         //立即转让,失败后异常
         if (transferer.transfer(e, false, 0) == null) {
             Thread.interrupted();
             throw new InterruptedException();
         }
     }

总结

阻塞队列

源码:github 欢迎各位同学指出问题/建议

上一篇下一篇

猜你喜欢

热点阅读