JUC并发相关

28. 并发终结之LinkedBlockingQueue

2020-09-29  本文已影响0人  涣涣虚心0215

与ArrayBlockingQueue不同,LinkedBlockingQueue内部则是使用链表构造的无界阻塞队列。
它内部使用了两个锁(putLock,takeLock),锁的粒度更细,但是因为使用的链表,所以相比较ArrayBlockingQueue,它需要动态的创建和删除链表节点,造成垃圾回收的负担。

内部数据结构

ArrayBlockingQueue内部使用的Object[]数组,LinkedBlockingQueue内部就用的链表。
且这里的count是原子类型,因为分为两个锁,所以仅仅volatile是不够的,还需要CAS保证原子性

//链表的节点
static class Node<E> {
    E item;
    LinkedBlockingQueue.Node<E> next;
    Node(E x) { item = x; }
}
//可以限制LinkedBlockingQueue的大小,变成有界的
private final int capacity;
//ArrayBlockingQueue也是用count表示元素个数,但是是基础类型的,因为更新都在lock内部
private final AtomicInteger count = new AtomicInteger();
//头结点
transient LinkedBlockingQueue.Node<E> head;
//尾节点
private transient LinkedBlockingQueue.Node<E> last;
//LinkedBlockingQueue使用两个锁来控制,粒度更细
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
//notEmpty表示不为空,可以取元素
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
//notFull表示不满,可以放元素
private final Condition notFull = putLock.newCondition();
add & remove

与ArrayBlockingQueue类似,add方法调用的是offer方法;remove方法调用的是poll方法。

offer

offer流程与ArrayBlockingQueue类似,判断queue是否满,不满则enqueue压入队列。

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    //如果count==capacity表示队列已经满了
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    //putLock上锁
    putLock.lock();
    try {
        //如果count<capacity表示队列还没有满
        if (count.get() < capacity) {
             //压入元素进入队列
            enqueue(node);
            //将count加1
            c = count.getAndIncrement();
            //如果c+1还是小于capacity,表示队列还没满,就唤醒notFull上等待队列(put,offer(time))
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    //最后这边c==0才会唤醒take方法去去元素,因为避免频繁的加锁唤醒
所以只有在queue为空之后放入第一个元素才会唤醒。
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}
//链表加入新元素
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
//唤醒notEmpty上的等待线程(take, poll(time))
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    //唤醒是需要加锁的,所以不是每次添加元素都会去唤醒一次
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}
put

put与offer都类似,只是在queue满的时候进入等待。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //可以响应中断的lock
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        //当queue满的时候进入wait,等待take唤醒
        while (count.get() == capacity) {
            notFull.await();
        }
        //加入新元素到queue
        enqueue(node);
        //count值加1
        c = count.getAndIncrement();
        //因为并不是每次都会唤醒这些put的等待队列,所以这边会进行一次唤醒
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
     //只有在queue为空放入第一个元素的时候才会进行唤醒take。
    if (c == 0)
        signalNotEmpty();
}
offer(time)

在内部会进行等待,但是有等待时间限制,不会一直无限等待;一定等待时间之内放不进元素,就return false

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new LinkedBlockingQueue.Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}
poll

poll在queue为空时,返回null。

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //当queue不为空的时候开始取出元素
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            //唤醒其他take,poll(time)线程开始取元素
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    //当queue满的时候被取走了一个,才会去唤醒put, offer(time)进行压入元素
    //不是每次取走一个元素就来唤醒
    if (c == capacity)
        signalNotFull();
    return x;
}
//从queue中取出元素
private E dequeue() {
    //head是一个空的Node对象
    LinkedBlockingQueue.Node<E> h = head;
    LinkedBlockingQueue.Node<E> first = h.next;
    //这里就是将head移动到下一个位置, 将原来的head回收掉
    h.next = h; // help GC
    head = first;
    E x = first.item;
    //元素被取出后,head的item应当为null
    first.item = null;
    return x;
}
take

与poll类似,只是多了响应中断以及queue为空的时候,notEmpty需要await

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //当queue为空的时候,就需要在notEmpty等待队列里等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

poll(time)

与take类似,只是等待有时间限制,在一定时间之内,queue还是为空,则返回null。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
上一篇 下一篇

猜你喜欢

热点阅读