并发

Java1.8-LinkedBlockingQueue源码学习(

2019-03-01  本文已影响23人  骑着乌龟去看海

一、概述

  上文我们学习了ArrayBlockingQueue的源码,本篇我们接着来学习LinkedBlockingQueue。ArrayBlockingQueue底层通过数组来实现,而与数组相对应,LinkedBlockingQueue底层是通过链表来实现的。

LinkedBlockingQueue 是基于链表实现的有界的阻塞队列。为了防止链表过度扩展,LinkedBlockingQueue的容量是可选的,如果未指定容量,容量最大值是Integer.MAX_VALUE;

二、源码

1. 继承结构

继承结构和ArrayBlockingQueue是一样的,这里就不多说了:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
2. 属性

接下来,我们先来看一下它的一些属性:

/** 链表容量, 如果没有指定,默认Integer.MAX_VALUE */
private final int capacity;

/** 链表中元素的个数 */
private final AtomicInteger count = new AtomicInteger();

/** 链表的头节点,头节点中不保存元素 */
transient Node<E> head;

/** 链表的尾节点,尾节点中不保存引用 */
private transient Node<E> last;

/** 出队元素锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 出队元素的Condition条件 */
private final Condition notEmpty = takeLock.newCondition();

/** 入队元素锁 */
private final ReentrantLock putLock = new ReentrantLock();

/** 入队元素的Condition条件 */
private final Condition notFull = putLock.newCondition();

从这里可以看出,LinkedBlockingQueue包含了读重入锁,写重入锁,实现了锁的读写分离,并且不同的锁有不同的Condition条件。并且头节点和尾节点还有一些特性:

头节点不包含元素,即head.item == null,也就是head.item一直为null;而尾节点不包含引用,即last.next == null,也就是last.next一直为null。

3. 构造方法

先来看一下前两个简单的构造方法:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    // 容量值不能小于0
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 初始化头节点和尾节点
    last = head = new Node<E>(null);
}

默认情况下,队列的容量是Integer.MAX_VALUE。来看另一个构造方法:

public LinkedBlockingQueue(Collection<? extends E> c) {
    // 初始化最大容量
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        // 循环将集合中元素添加到队列中
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            // 集合容量不能大于队列容量
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        // 设置数组容量
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

从给定集合初始化一个队列,并且队列中元素不能为null,所给定集合容量不能大于队列容量,然后将元素转化为Node节点,保存到队列中。这里调用了入队的方法enqueue,后面再来介绍这个方法。

4. 内部类

LinkedBlockingQueue中提供了一个内部类Node,用来保存元素,这一点和ArrayBlockingQueue是不同的,因为链表除了保存常规的元素外,还需要有指针,指向下一个节点,来简单看一下源码:

static class Node<E> {
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;

    Node(E x) { item = x; }
}

该节点比较简单,除了有一个元素类型之外,还有一个next指针,指向链表的下一个节点,该值如果为null,表示该节点没有后继节点,也就是该节点是最后一个节点。

5. 方法
5.1 put方法

首先来看下put方法,该方法是一个阻塞的入队方法,来看下源码:

public void put(E e) throws InterruptedException {
    // 元素非空判断
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    // 将元素转化为Node节点
    Node<E> node = new Node<E>(e);
    // 获取入队锁,及链表中元素个数
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 线程可中断
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        // 如果队列满了
        while (count.get() == capacity) {
            // 阻塞notFull条件对应的线程
            notFull.await();
        }
        // 线程可用了,进行入队
        enqueue(node);
        // 元素个数原子性增加,返回到是增加前的数量
        c = count.getAndIncrement();
        // 如果增加后的数量小于队列的容量,唤醒在notFull上等待的线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 如果元素个数为0,表示队列原先为空,此时添加成功了,队列里有元素了,
    // 则需要唤醒在notEmpty条件上等待的线程
    if (c == 0)
        signalNotEmpty();
}

来简单看下流程:

这中间调用了入队的方法enqueue,这里来简单看下:

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

该方法表示将节点添加至队列的尾部,然后更新链表的尾节点,可以画个简单的链表操作图来帮助理解。可能需要注意一点:

尾节点是不包含引用的,只包含元素对象;

至于唤醒notEmpty的方法signalNotEmpty,则比较简单,这里来看下源码:

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}
5.2 offer方法

put方法也就是入队方法,如果队列已满,返回false,其他的操作和put类似,这里来简单看下源码:

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 队列满了,直接返回false
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 队列没满
        if (count.get() < capacity) {
            // 入队
            enqueue(node);
            c = count.getAndIncrement();
            // 如果添加后队列依旧没满,唤醒notFull对应线程
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    // 如果原队列为空,现在添加后不为空了,唤醒等在notEmpty上的线程
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

而重载的超时方法offer(E, long, TimeUnit)逻辑实现也差不太多,来简单看下:

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    // 获取超时时间
    long nanos = unit.toNanos(timeout);
    int c = -1;
    // 获取入队锁
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 如果队列已满
        while (count.get() == capacity) {
            // 然后再判断超时时间是否大于0,
            if (nanos <= 0)
                return false;
            // 等待对应的超时时间,并且重置超时时间
            // 为什么重置呢,因为可能被notFull条件对应的线程进行唤醒操作
            nanos = notFull.awaitNanos(nanos);
        }
        // 入队
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}
5.3 take方法

take方法是一个阻塞的出队方法,其与put方法是相对应的,我们来看一下:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 如果队列为空,阻塞,直到队列不为空
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 出队
        x = dequeue();
        c = count.getAndDecrement();
        // 如果出队后队列依旧不为空,唤醒notEmpty上等待的线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 如果原来队列已满,现在出队了一个,说明队列可用,唤醒notFull对应等待的线程
    if (c == capacity)
        signalNotFull();
    return x;
}

来简单看下流程,流程其实是和put相对应:

这里调用到了出队方法dequeue方法,来简单看下:

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    // 原先头节点
    Node<E> h = head;
    // 原先头节点的下一个节点(第一个节点)
    Node<E> first = h.next;
    // 原先头节点的next指针指向自己
    h.next = h; // help GC
    // 头节点后移
    head = first;
    // 获取第一个节点的元素值
    E x = first.item;
    // 将原来的第一个节点设置为null
    first.item = null;
    return x;
}

该方法的目的是为了更新头节点,将头节点更新为原先头节点的下一个节点,可能需要注意的是:

头节点是不保存元素值的,只保存指向下一个节点的指针。

而至于signalNotFull方法,就不多说了。

5.4 poll方法

出队方法poll,表示如果队列为空,直接返回null,其他操作与take方法类似,来简单看下源码:

public E poll() {
    final AtomicInteger count = this.count;
    // 队列为空,直接返回null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 队列不为空,也就是队列中有元素
        if (count.get() > 0) {
            // 出队
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    // 同理,唤醒
    if (c == capacity)
        signalNotFull();
    return x;
}

而超时的poll方法与超时的offer方法是类似的,这里只贴下源码,不多说了:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
5.5 peek方法

至于peek方法,就比较简单,也没必要多说了:

public E peek() {
    // 队列为空
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 获取头节点的下一个节点
        Node<E> first = head.next;
        // 如果第一个节点为空
        if (first == null)
            return null;
        else
            // 取值
            return first.item;
    } finally {
        takeLock.unlock();
    }
}
5.6 remove方法

remove方法表示移除队列中相应的元素,来看下源码:

public boolean remove(Object o) {
    if (o == null) return false;
    // 获取入队锁和出队锁
    fullyLock();
    try {
        // 循环遍历,从队头到队尾
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                // 移除元素
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        // 释放入队锁和出队锁
        fullyUnlock();
    }
}

我们来简单看下流程:

这里移除元素的操作会调用unlink方法,该方法用于将指定节点从链表中断开,我们来简单看下这个方法:

/**
 * 这里trail节点是p节点的前驱节点
 */
void unlink(Node<E> p, Node<E> trail) {
    // assert isFullyLocked();
    // p.next is not changed, to allow iterators that are
    // traversing p to maintain their weak-consistency guarantee.
    // 将p节点元素置为null
    p.item = null;
    // 将trail节点指向的下一个节点的引用,指向p节点的下一个节点
    // 也就是将p节点断开
    trail.next = p.next;
    // 如果p节点是尾节点,重置尾节点
    if (last == p)
        last = trail;
    // 如果原来队列已满,这里移除后,队列就可用了,唤醒在notFull上等待的线程
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

这里的操作还会判断如果移除元素之前,队列已满,那么移除之后,队列就可用了,这时候会唤醒在notFull上等待的线程。另外,fullyLockfullyUnlock比较简单,就不多说了:

/**
 * Locks to prevent both puts and takes.
 */
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

/**
 * Unlocks to allow both puts and takes.
 */
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

四、总结

到这里,大部分常用的方法都源码我们都学习过了,这里我们进行简单的总结下:

本文参考链接除了官方文档外,还参考自:
《Java并发编程实战》
【JUC】JDK1.8源码分析之LinkedBlockingQueue(四)

上一篇下一篇

猜你喜欢

热点阅读