程序员

六.线程同步容器-阻塞队列

2017-09-19  本文已影响0人  蜗牛1991

一.层次图

二.BlockingQueue接口

BlockingQueue是一个支持两个附加操作的队列:

image.png

三.ArrayBlockingQueue 实现类

    //容器
    final Object[] items;
    //锁
    final ReentrantLock lock;
    //通知队列
    private final Condition notEmpty;
    private final Condition notFull;
     public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();//返回一个AQS的ConditionObject
        notFull =  lock.newCondition();
    }
 public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //如果线程已被中断则直接抛出异常,否尝试获取锁,若获取锁失败,先将当前线程加入    
          AQS队列,之后获取队列中的Node节点,只有pre节点为head节点才获取再次尝试获取锁,
          若此时获取锁失败或pre节点不为head节点时,让当前线程阻塞。(详细请看AQS章节)
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();//若容器满,则将当前线程加入通知队列(详细请看AQS章节)
          //若容器不满,则将值插入数组
            insert(e);
        } finally {
            lock.unlock();//释放当前线程
        }
    }

参考:聊聊并发(七)——Java中的阻塞队列

四.LinkedBlockingQueue实现类


static class Node<E> {
        E item;
        Node<E> next;

        Node(E x) { item = x; }
    }

五.PriorityBlockingQueue

   // 默认容量  
    private static final int DEFAULT_INITIAL_CAPACITY = 11;  
   //优先级队列,表示一个平衡二叉树堆:  
  private transient Object[] queue;  
   public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
 public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
       //如果队列已满,释放锁,获取扩容锁,成功则扩容,扩容后则重新获取锁,将原始队列拷贝的新的队列中。  
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            //释放锁
            lock.unlock();
        }
        return true;
    }

参考:PriorityBlockingQueue解析

上一篇 下一篇

猜你喜欢

热点阅读