IT技术

10. LinkedBlockingQueue

2019-03-20  本文已影响0人  shallowinggg

LinkedBlockingQueue类实现了BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

LinkedBlockingQueue在内部将元素存储在链接结构(链接节点)中。如果需要,该链接结构可以具有一个上限。如果未指定上限,则使用Integer.MAX_VALUE作为上限。

LinkedBlockingQueue内部将元素以FIFO(先入先出)次序存储。队列的头部是已在队列中的时间最长的元素,队列的尾部是已在队列中的时间最短的元素。

以下是如何实例化和使用LinkedBlockingQueue

BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);

bounded.put("Value");

String value = bounded.take();

源码

LinkedBlockingQueue内部使用了一个单向链表,同时它提供了两个锁,一个用于获取并删除元素,一个用于增加元素。count字段使用原子变量,避免修改它时需要同时获取两个锁。

static class Node<E> {
    E item;

    /**
     * 下面中的一个:
     * - 真实的后继节点
     * - 这个节点本身,此时原后继节点现在是head.next,即第一个元素
     * - null, 意味没有后继节点,此节点是队列最后一个节点
     */
    Node<E> next;

    Node(E x) { item = x; }
}

private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
transient Node<E> head;

/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

增加操作

注意进行增加操作时,只对putLock加锁,如果还对takeLock也进行加锁,那么就会影响性能。同时,为了弥补此方法带来的后果,count使用原子变量,进行CAS更新,防止数据不一致。

为了提升性能,在增加元素成功后,如果队列还没有满,那么便唤醒其他因队列满而被阻塞的插入线程。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 注意即使count没有被锁保护,它依然可以被用作等待条件
        // 判定。因为此时count只会被减少(putLock已加锁),如果容量
        // 改变,会被唤醒。count在其他地方的使用也与此相似。

        // 队列已满,阻塞自己
        while (count.get() == capacity) {
            notFull.await();
        }
        // 插入队列中
        enqueue(node);
        // CAS更新count值
        c = count.getAndIncrement();
        // 如果队列没满,唤醒其他等待插入的线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 如果队列原来是空队列,唤醒等待提取元素的线程
    if (c == 0)
        signalNotEmpty();
}

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    // 先加锁,才能调用对应Condtion的signal()方法
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 队列已满,返回false
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 等待-超时机制
        while (count.get() == capacity) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

删除操作

删除操作与增加操作一样。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 当队列为空,阻塞自己
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 将头节点出队
        x = dequeue();
        c = count.getAndDecrement();
        // 如果队列还有元素,唤醒其他等待提取元素的线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 如果原本队列是满的,唤醒增加线程,因为现在元素已经被取出,队列不满
    if (c == capacity)
        signalNotFull();
    return x;
}

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;

    // 头节点为空,其中不存储元素
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

// 删除一个指定元素
public boolean remove(Object o) {
    if (o == null) return false;
    // 将两个锁全部加锁
    fullyLock();
    try {
        for (Node<E> pred = head, p = pred.next;
             p != null;
             pred = p, p = p.next) {
            if (o.equals(p.item)) {
                // 从队列中移除此节点
                unlink(p, pred);
                return true;
            }
        }
        return false;
    } finally {
        // 释放全部两个锁
        fullyUnlock();
    }
}

void unlink(Node<E> p, Node<E> pred) {
    // assert putLock.isHeldByCurrentThread();
    // assert takeLock.isHeldByCurrentThread();
    // p.next没有被设置为null,为了保证迭代器遍历到p时继续工作,
    // 保证弱一致性
    p.item = null;
    pred.next = p.next;
    if (last == p)
        last = pred;
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

访问操作

public E peek() {
    // 队列为空,返回null
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 返回第一个元素
        return (count.get() > 0) ? head.next.item : null;
    } finally {
        takeLock.unlock();
    }
}

其他操作

public void clear() {
    fullyLock();
    try {
        for (Node<E> p, h = head; (p = h.next) != null; h = p) {
            // 使得next指向自己
            h.next = h;
            // 解除对元素实体的引用
            p.item = null;
        }
        head = last;
        // assert head.item == null && head.next == null;
        // 如果原来队列是满的,唤醒等待的插入线程
        if (count.getAndSet(0) == capacity)
            notFull.signal();
    } finally {
        fullyUnlock();
    }
}


public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
    Objects.requireNonNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    boolean signalNotFull = false;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 获取当前队列中的元素数量
        int n = Math.min(maxElements, count.get());
        // count.get provides visibility to first n Nodes
        Node<E> h = head;
        int i = 0;
        try {
            // 将n个元素加入到指定集合中
            while (i < n) {
                Node<E> p = h.next;
                c.add(p.item);
                p.item = null;
                h.next = h;
                h = p;
                ++i;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            if (i > 0) {
                // assert h.item == null;
                head = h;
                signalNotFull = (count.getAndAdd(-i) == capacity);
            }
        }
    } finally {
        takeLock.unlock();
        if (signalNotFull)
            signalNotFull();
    }
}

迭代器

LinkedBlockingQueue的迭代器与DelayQueue的不同,DelayQueue的迭代器与原组件没有任何的一致性,而LinkedBlockingQueue的迭代器与内部的链表保持了弱一致性。

注意它的next()方法,它会跳过内容为null的节点,回忆前面删除操作中的remove(Object)方法,他没有修改节点的next字段,如果修改了,迭代器就会无法正常工作,而为了保证一致性,迭代器也需要跳过这个空节点。

而它的forEachRemaining(Consumer<? super E> action)方法是分批次进行处理的,每批64个元素,如果数量小于64,那就使用此数量。

private class Itr implements Iterator<E> {
    private Node<E> next;           // 持有nextItem的节点
    private E nextItem;             // 下一个进行处理的元素
    private Node<E> lastRet;        // 上一个返回的元素,即当前正在使用的
    private Node<E> ancestor;       // Helps unlink lastRet on remove()

    Itr() {
        fullyLock();
        try {
            // 保存第一个元素
            if ((next = head.next) != null)
                nextItem = next.item;
        } finally {
            fullyUnlock();
        }
    }

    public boolean hasNext() {
        return next != null;
    }

    public E next() {
        Node<E> p;
        if ((p = next) == null)
            throw new NoSuchElementException();
        lastRet = p;
        E x = nextItem;
        fullyLock();
        try {
            E e = null;
            // 注意此处,遇到空节点会跳过去访问下一个节点
            for (p = p.next; p != null && (e = p.item) == null; )
                p = succ(p);
            next = p;
            nextItem = e;
        } finally {
            fullyUnlock();
        }
        return x;
    }
    
    Node<E> succ(Node<E> p) {
        // 正常出队的元素next字段会指向自己
        if (p == (p = p.next))
            p = head.next;
        return p;
    }
    
    public void forEachRemaining(Consumer<? super E> action) {
        // A variant of forEachFrom
        Objects.requireNonNull(action);
        Node<E> p;
        if ((p = next) == null) return;
        lastRet = p;
        next = null;
        final int batchSize = 64;
        Object[] es = null;
        int n, len = 1;
        do {
            fullyLock();
            try {
                if (es == null) {
                    p = p.next;
                    // 获取真正存在的元素的数量,如果多于64,分批进行,一批为64个
                    for (Node<E> q = p; q != null; q = succ(q))
                        if (q.item != null && ++len == batchSize)
                            break;
                    es = new Object[len];
                    es[0] = nextItem;
                    nextItem = null;
                    n = 1;
                } else
                    n = 0;
                // n为1的使用只因为p=p.next,经过此步后p已经不是首元素,
                // 而是第二个元素。而后面批次的插入直接从0开始即可
                // 将元素放入数组中
                for (; p != null && n < len; p = succ(p))
                    if ((es[n] = p.item) != null) {
                        lastRet = p;
                        n++;
                    }
            } finally {
                fullyUnlock();
            }
            // 分别调用accept方法
            for (int i = 0; i < n; i++) {
                @SuppressWarnings("unchecked") E e = (E) es[i];
                action.accept(e);
            }
        } while (n > 0 && p != null);
    }

    public void remove() {
        // 获取当前元素
        Node<E> p = lastRet;
        if (p == null)
            throw new IllegalStateException();
        lastRet = null;
        fullyLock();
        try {
            if (p.item != null) {
                if (ancestor == null)
                    ancestor = head;
                // 获取p的前驱结点
                ancestor = findPred(p, ancestor);
                // 从链表中删除结点p
                unlink(p, ancestor);
            }
        } finally {
            fullyUnlock();
        }
    }
}

测试:

import org.junit.Test;

import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueTest {
    private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();

    @Test
    public void test() {
        queue.offer("1");
        queue.offer("2");
        queue.offer("3");
        queue.offer("4");

        Iterator<String> itr = queue.iterator();
        queue.remove("3");
        itr.forEachRemaining(System.out::println);
    }
}

输出如下:

1
2
4

核心要点

  1. 内部使用一个单向链表,以FIFO顺序存储
  2. 可以在链表两头同时进行操作,所以使用两个锁分别保护
  3. 插入线程在执行完操作后如果队列未满会唤醒其他等待插入的线程,同时队列非空还会唤醒等待获取元素的线程;提取线程同理。
  4. 迭代器与单向链表保持弱一致性,调用remove(T)方法删除一个元素后,不会解除其对下一个结点的next引用,否则迭代器将无法工作。
  5. 迭代器的forEachRemaining(Consumer<? super E> action)以64个元素为一批进行操作
上一篇下一篇

猜你喜欢

热点阅读