Java 并发程序员

【Java 并发笔记】7 种阻塞队列相关整理

2019-01-15  本文已影响5人  58bc06151329

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

1. 简介

Queue 接口

入队 出队 检索 处理方式
add() remove() element() 在执行方法失败时不返回值,抛出异常。
offer() poll() peek() 在执行方法时,给出返回值,比如 false、null。

BlockingQueue 接口

阻塞入队 阻塞出队 定时入队 定时出队
put(E e) E take() offer(E e,long timeout,TimeUnit unit) E poll(long timeout,TimeUnit unit)

2 Java 中的阻塞队列

阻塞队列 说明
ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列。
DelayQueue 一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue 一个不存储元素的阻塞队列。
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。

ArrayBlockingQueue

ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);
......
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
}

LinkedBlockingQueue

public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
}

PriorityBlockingQueue

public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
}

DelayQueue


Deque 接口

队头入队 队头出队 队尾入队 队尾出队 队头检索 队尾检索 处理方式
addFirst() removeFirst() addLast() removeLast() getFirst() getLast() 在方法执行失败时会抛出异常
offerFirst() pollFirst() offerLast() pollLast() peekFirst() peekLast() 在方法执行失败时会返回 false 或者 null。

SynchronousQueue

LinkedTransferQueue

LinkedBlockingDeque


BlockingDeque 接口

阻塞队头入队 阻塞队头出队 阻塞队尾入队 阻塞队尾出队 处理方式
putFirst(E e) E takeFirst() putLast(E e) E takeLast() 没有超时设置
offerFirst(E e,long timeout,TimeUnit unit) E pollFirst(long timeout,TimeUnit unit) offerLast(E e,long timeout,TimeUnit unit) E pollLast(long timeout,TimeUnit unit) 在超时之后,返回 false 或者 null。

3. 阻塞队列的实现原理

3.1 BlockingQueue

public interface BlockingQueue<E> extends Queue<E> {
    //添加失败时会抛出异常
    boolean add(E e);

    //添加失败时会返回 false
    boolean offer(E e);

    //添加元素时,如果没有空间,会阻塞等待;可以响应中断
    void put(E e) throws InterruptedException;

    //添加元素到队列中,如果没有空间会等待参数中的时间,超时返回,会响应中断
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //获取并移除队首元素,如果没有元素就会阻塞等待
    E take() throws InterruptedException;

    //获取并移除队首元素,如果没有就会阻塞等待参数的时间,超时返回
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //返回队列中剩余的空间
    int remainingCapacity();

    //移除队列中某个元素,如果存在的话返回 true,否则返回 false
    boolean remove(Object o);

    //检查队列中是否包含某个元素,至少包含一个就返回 true
    public boolean contains(Object o);

    //将当前队列所有元素移动到给定的集合中,这个方法比反复地获取元素更高效
    //返回移动的元素个数
    int drainTo(Collection<? super E> c);

    //移动队列中至多 maxElements 个元素到指定的集合中
    int drainTo(Collection<? super E> c, int maxElements);
}

3.2 ArrayBlockingQueue

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    //使用数组保存的元素
    final Object[] items;
    //下一次取元素的索引
    int takeIndex;
    //下一次添加元素的索引
    int putIndex;
    //当前队列中元素的个数
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    //全部操作的锁
    final ReentrantLock lock;
    //等待获取元素的锁
    private final Condition notEmpty;
    //等待添加元素的锁
    private final Condition notFull;
    //...
}

构造函数

//指定队列的容量,使用非公平锁
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
//允许使用一个 Collection 来作为队列的默认元素
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {    //遍历添加指定集合的元素
                if (e == null) throw new NullPointerException();
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;    //修改 putIndex 为 c 的容量 +1
    } finally {
        lock.unlock();
    }
}

add 方法

public boolean add(E e) {
    return super.add(e);
}
//super.add() 的实现
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

offer 方法

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length) putIndex = 0;
    count++;
    notEmpty.signal();
}

put 方法

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

offer(E,long,TimeUnit) 方法

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

poll 方法

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

take 方法

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

poll(long,TimeUnit) 方法

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

peek 方法

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}
final E itemAt(int i) {
    return (E) items[i];
}

总结

  1. 一旦创建,则容量不能再改动
  2. 这个类是线程安全的,并且迭代器也是线程安全的。
  3. 这个类的 puttake 方法分别会在队列满了和队列空了之后被阻塞操作。
  4. 这个类提供了 offerpoll 方法来插入和提取元素,而不会在队列满了或者队列为空时阻塞操作。
  5. 这个队列的锁默认是 不公平 策略,即唤醒线程的顺序是不确定的。

3.3 LinkedBlockingQueue

//链表结点
static class Node<E> {
    E item;

    //对当前结点的后一个结点,有三种情况:
    //1.真正的结点
    //2.当前结点本身,说明当前结点是头结点
    //3.null,说明这个结点是尾结点
    Node<E> next;

    Node(E x) { item = x; }
}

//当前容量,默认是 Integer.MAX_VALUE
private final int capacity;
//当前队列中的元素数量
private final AtomicInteger count = new AtomicInteger();
//队列中的头结点,头结点的.item 永远为 null
transient Node<E> head;
//队列的尾结点,尾结点的 next 永远为 null
private transient Node<E> last;

//获取元素的锁
private final ReentrantLock takeLock = new ReentrantLock();
//等待取元素的等待队列
private final Condition notEmpty = takeLock.newCondition();
//添加元素的锁
private final ReentrantLock putLock = new ReentrantLock();
//等待添加元素的等待队列
private final Condition notFull = putLock.newCondition();

构造函数

//使用 Integer.MAX_VALUE 作为容量
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

//指定最大容量
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

//使用 Integer.MAX_VALUE 作为容量,同时将指定集合添加到队列中
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {    //遍历添加到队列
            if (e == null)    //需要注意待添加集合中不能有空值
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

put 方法

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException(); // 不允许空元素
    int c = -1;
    Node<E> node = new Node(e); // 以新元素构造结点
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly(); // 放锁加锁,保证调用put方法的时候只有1个线程
    try {
        while (count.get() == capacity) { // 如果容量满了
            notFull.await(); // 阻塞并挂起当前线程
        }
        enqueue(node); // 结点添加到链表尾部
        c = count.getAndIncrement(); // 元素个数+1
        if (c + 1 < capacity) // 如果容量还没满
            notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满
    } finally {
        putLock.unlock(); // 释放放锁,让其他线程可以调用put方法
    }
    if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据
        signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
}
private void enqueue(Node<E> node) {
    last = last.next = node;
}
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

offer 方法

public boolean offer(E e) {
    if (e == null) throw new NullPointerException(); // 不允许空元素
    final AtomicInteger count = this.count;
    if (count.get() == capacity) // 如果容量满了,返回false
        return false;
    int c = -1;
    Node<E> node = new Node(e); // 容量没满,以新元素构造结点
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // 放锁加锁,保证调用offer方法的时候只有1个线程
    try {
        if (count.get() < capacity) { // 再次判断容量是否已满,因为可能拿锁在进行消费数据,没满的话继续执行
            enqueue(node); // 结点添加到链表尾部
            c = count.getAndIncrement(); // 元素个数+1,并返回旧值
            if (c + 1 < capacity) // 如果容量还没满
                notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满
        }
    } finally {
        putLock.unlock(); // 释放放锁,让其他线程可以调用offer方法
    }
    if (c == 0) // 如果队列中还有1条数据
        signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
    return c >= 0; // 添加成功返回true,否则返回false
}

offer(E,long,TimeUnit) 方法

take 方法

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly(); // 拿锁加锁,保证调用take方法的时候只有1个线程
    try {
        while (count.get() == 0) { // 如果队列里已经没有元素了
            notEmpty.await(); // 阻塞并挂起当前线程
        }
        x = dequeue(); // 删除头结点
        c = count.getAndDecrement(); // 元素个数-1
        if (c > 1) // 如果队列里还有元素
            notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费
    } finally {
        takeLock.unlock(); // 释放拿锁,让其他线程可以调用take方法
    }
    if (c == capacity) // 如果队列中还可以再插入数据
        signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据
    return x;
}
public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock(); // remove操作要移动的位置不固定,2个锁都需要加锁
    try {
        for (Node<E> trail = head, p = trail.next; // 从链表头结点开始遍历
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) { // 判断是否找到对象
                unlink(p, trail); // 修改结点的链接信息,同时调用notFull的signal方法
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock(); // 2个锁解锁
    }
}
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;    //指向队首的结点后移
    E x = first.item;
    first.item = null;
    return x;
}

3.4 PriorityBlockingQueue

构造函数

private static final int DEFAULT_INITIAL_CAPACITY = 11;
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private transient Object[] queue;
private transient int size;
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

offer 方法

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];    //扩容数组
        } finally {
            allocationSpinLock = 0;
        }
    }
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);    //拷贝原有数据
    }
}

保证优先级

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) { // 循环比较
        // 寻找k的父元素下标,固定规则
        int parent = (k - 1) >>> 1;    
        Object e = array[parent];
        // 自下而上一般出现在插入元素时调用,插入元素是插入到队列的最后,则需要将该元素调整到合适的位置
        // 即从队列的最后往上调整堆,直到不小于其父结点为止,相当于冒泡              
        if (key.compareTo((T) e) >= 0)    //比较
            break;
        // 如果当前结点<其父结点,则将其与父结点进行交换,并继续往上访问父结点
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

poll 方法

public E poll() {
    // size==0队列为0,直接返回null
    if (size == 0)
        return null;
    int s = --size;
    modCount++;
    // 出队总是将数组的第一个元素进行出队,
    E result = (E) queue[0];
    E x = (E) queue[s];
    queue[s] = null;
    if (s != 0)
        // 同时将队列的最后一个元素放到第一个位置,然后自上而下调整堆
        siftDown(0, x);
    return result;
}
private void siftDownUsingComparator(int k, E x) {
    // 由于堆是一个二叉树,所以size/2是树中的最后一个非叶子结点
    // 如果k是叶子结点,那么其无子结点,则不需要再往下调整堆
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        Object c = queue[child];
        // 右结点
        int right = child + 1;
        // 找出两个子结点以及父结点中较小的一个
        if (right < size &&
            comparator.compare((E) c, (E) queue[right]) > 0)
            c = queue[child = right];
        // 如果父结点最小,则无需继续往下调整堆
        if (comparator.compare(x, (E) c) <= 0)
            break;
        // 否则将父结点与两个子结点中较小的一个交换,然后往下继续调整
        queue[k] = c;
        k = child;
    }
    queue[k] = x;
}

3.5 DelayQueue

public interface Delayed extends Comparable<Delayed> {
    //返回当前对象的剩余执行时间
    long getDelay(TimeUnit unit);
}

3.5.1 DelayQueue 的关键属性

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;

/**
 * Condition signalled when a newer element becomes available
 * at the head of the queue or a new thread may need to
 * become leader.
 */
private final Condition available = lock.newCondition();
属性 说明
ReentrantLock lock 重入锁。
PriorityQueue q 无界的、优先级队列。
Thread leader Leader-Follower 模型中的 leader
Condition available 队首有新元素可用或者有新线程成为 leader 时触发的 condition。

PriorityQueue

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    if (i >= queue.length)
        grow(i + 1);
    size = i + 1;
    if (i == 0)
        queue[0] = e;
    else
        siftUp(i, e);
    return true;
}

Leader-Follower 模型

Leader-Follower 模型

实现 Delayed 接口

延时阻塞队列的实现

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();    //先获取队首元素,不删除
            if (first == null)    //如果为空就阻塞等待
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)    //比较元素延时时间是否到达
                    return q.poll();    //如果是就移除并返回
                first = null; // don't retain ref while waiting
                if (leader != null)    //如果有 leader 线程,依然阻塞等待
                    available.await();
                else {        //如果没有 leader 线程,指定当前线程,然后等待任务的待执行时间
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {        //最后等待时间到了后,就通知阻塞的线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

//PriorityQueue.peek()
public E peek() {
    return (size == 0) ? null : (E) queue[0];
}

3.6 SynchronousQueue

private transient volatile Transferer<E> transferer;

public SynchronousQueue() {
    this(false);
}

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

put 方法

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {    
        Thread.interrupted();
        throw new InterruptedException();
    }
}
/**
 * Puts or takes an item.
 */
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; 
    int mode = (e == null) ? REQUEST : DATA;    //判断是添加还是获取

    for (;;) {
        SNode h = head;      //获取栈顶结点  
        if (h == null || h.mode == mode) {  // empty or same-mode
            if (timed && nanos <= 0) {      // can't wait
                if (h != null && h.isCancelled())    //如果头结点无法获取,就去获取下一个
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                //设置头结点
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

结论

3.7 LinkedTransferQueue

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {...}

TransferQueue

public interface TransferQueue<E> extends BlockingQueue<E> {
    //尽可能快地转移元素给一个等待的消费者
    //如果在这之前有其他线程调用了 taked() 或者 poll(long,TimeUnit) 方法,就返回 true
    //否则返回 false
    boolean tryTransfer(E e);

    //转移元素给一个消费者,在有的情况下会等待直到被取走
    void transfer(E e) throws InterruptedException;

    //在 timeout 时间内将元素转移给一个消费者,如果这段时间内传递出去了就返回 true
    //否则返回 false
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //如果至少有一个等待的消费者,就返回 true
    boolean hasWaitingConsumer();

    //返回等待获取元素的消费者个数
    //这个值用于监控
    int getWaitingConsumerCount();
}

transfer 方法

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

tryTransfer 方法

public boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

3.8 LinkedBlockingDeque

关键属性

static final class Node<E> {
    E item;
    Node<E> prev;
    Node<E> next;
    Node(E x) {
        item = x;
    }
}

transient Node<E> first;
transient Node<E> last;
private transient int count;
private final int capacity;
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

4. 阻塞队列的特点

ArrayBlockingQueue

LinkedBlockingQueue

PriorityBlockingQueue

DelayQueue

SynchronousQueue

LinkedTransferQueue

LinkedBlockingDeque

参考资料

https://blog.csdn.net/u011240877/article/details/73612930#1arrayblockingqueue
https://blog.csdn.net/fuyuwei2015/article/details/72716753
https://blog.csdn.net/tonywu1992/article/details/83419448

上一篇 下一篇

猜你喜欢

热点阅读