java并发编程(6):BlockingQueue相关源码详解

2019-11-02  本文已影响0人  桥头放牛娃

BlockingQueue的类继承结构如下,其主要实现类有:ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue,LinkedTransferQueue。

BlockingQueue的类继承结.png

1、ArrayBlockingQueue

ArrayBlockingQueue是由数组组成的有界阻塞队列。队列中元素按照FIFO的方式进行排序。默认情况下线程以非公平的方式获取队列中的数据。

1.1、主要属性

//队列元素的容器,以数组作为队列的实现
final Object[] items;

//下一个获取(take、poll、peek、remove)元素数据的数组索引
int takeIndex;

//下一个存储(put、offer、add)元素的数组索引
int putIndex;

//队列中元素的个数
int count;

//锁
final ReentrantLock lock;

//队列不为空的信号量,即当插入数据到队列的时候,
//会通过notEmpty信号量通知其他等待获取队列数据的线程
private final Condition notEmpty;

//队列不满的信号量,即当线程从队列中取出数据后,
//会通过notFull信号量通知其他线程可以继续向队列中存储元素
private final Condition notFull;

1.2、构造函数

//初始化队列的容量
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

//初始队列的容量,及是否为公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

//初始化队列容量,是否为公平锁、及初始队列中的元素
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    //获取锁
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            //将初始化的元素放入队列中
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

1.3、入队操作

//offer:非阻塞入队操作
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //队列中元素的数量已经达到队列容量的上线,则直接返回false,否则将元素enqueue()
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

//put:阻塞入队操作
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //若队列空间达到容量上线,则调用notFull.await()等待其他线程取走只是一个队列中的数据
        while (count == items.length)
            notFull.await();
        //当队列不满的时候,直接将数据入队
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

//offer:超时时间内入队数据
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //若队列已满
        while (count == items.length) {
            //无需等待超时,则直接返回false
            if (nanos <= 0)
                return false;
            //等待超时时间nanos后再检查队列是否还是满的    
            nanos = notFull.awaitNanos(nanos);
        }
        //若队列不满则做入队操作
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

//入队操作
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    //将数据插入队尾
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    //通知其他等待获取数据的线程
    notEmpty.signal();
}

1.4、出队操作

//poll:非阻塞获取队列中的数据
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //队列为空,则返回null;否则返回队头数据
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

//take:阻塞获取队列中数据
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //队列为空,则在notEmpty信号量上等待其他线程将数据放入队列
        while (count == 0)
            notEmpty.await();
        //获取队列中数据    
        return dequeue();
    } finally {
        lock.unlock();
    }
}

//poll:在超时时间内获取数据
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //队列为空,如无需等待,则直接返回null;否则等待nanos超时时间
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

//直接获取队头的数据,但不改变takeIndex值,即可重复peek()获取队头的数据
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

//获取队头数据
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //通知其他在notFull上等待的线程    
    notFull.signal();
    return x;
}

1.5、移除数据

void removeAt(final int removeIndex) {
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    
    //移除的是队头的数据?则直接移除队头数据,并修改takeIndex值
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove

        // slide over all others up through putIndex.
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)
                next = 0;
            //将队列中的数据前移    
            if (next != putIndex) {
                items[i] = items[next];
                i = next;
            } else {
                //队尾数据
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();
}

2、LinkedBlockingQueue

LinkedBlockingQueue是基于单向链表的阻塞队列。

2.1、基本属性

//链表中的节点
static class Node<E> {
    E item;

    //后继节点
    Node<E> next;

    Node(E x) { item = x; }
}

//队列的容量
private final int capacity;

//当前队列中元素的个数
private final AtomicInteger count = new AtomicInteger();

//队列的头结点
transient Node<E> head;

//队列的尾节点
private transient Node<E> last;

//出队(take,poll等)的锁
private final ReentrantLock takeLock = new ReentrantLock();

//出队等待的信号量
private final Condition notEmpty = takeLock.newCondition();

//入队锁
private final ReentrantLock putLock = new ReentrantLock();

//入队等待的信号量
private final Condition notFull = putLock.newCondition();

2.2、入队方法

//阻塞入队数据
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        //当前队列已满?则在notFull信号量上等待,直到有其他线程从队列中获取元素
        while (count.get() == capacity) {
            notFull.await();
        }
        //当队列未满时,将元素入队,并增加队列元素计数count
        enqueue(node);
        c = count.getAndIncrement();
        //若此时队列还未满,则在notFull中发信号通知下个在notFull等待的线程进行数据的入队操作
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    //当队列已满时,在notEmpty发信号,通知其他等待此信号的线程
    if (c == 0)
        signalNotEmpty();
}

//超时时间内入队
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        //队列已满则且无需等待,则直接返回false,否则在notFull上等待超时
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        //数据入队及元素计数
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

//非阻塞入队操作,入队成功返回true,否则为false,不进行阻塞操作
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

2.3、出队方法

//阻塞等待出队
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

//超时等待出队
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

//非阻塞等待出队
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

3、PriorityBlockingQueue

PriorityBlockingQueue为有优先级的阻塞队列实现。

3.1、基本属性

//默认初始化的队列数组容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;

//数组最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

//队列数组,实际存储的是树型的数据,不过用数组来存储树
//实际结构为二叉树,而树的父节点一定比子节点大,但左右子节点没有大小的要求
private transient Object[] queue;

//队列中元素的个数
private transient int size;

//元素优先级比较器
private transient Comparator<? super E> comparator;

//对外操作的锁
private final ReentrantLock lock;

//notEmpty等待队列
private final Condition notEmpty;

//用户空间分配的自旋锁
private transient volatile int allocationSpinLock;

//用于序列化和反序列化的队列
private PriorityQueue<E> q;

3.2、入队操作

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    //容量已满,需要扩容?
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        //将数据插入树中
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

//将元素向数的上层移动
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        //获取父节点下标
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        //若当前数据比父节点大,则无需上移,否则将当前节点上移
        if (key.compareTo((T) e) >= 0)
            break;
        //将父节点下移    
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

3.3、出队操作

//阻塞出队操作,在此只分析take,其他如poll,peek类似
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

//取出独立头部位置,即去除树中的根节点数据
private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

//调整树结构
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

4、SynchronousQueue

SynchronousQueue为无缓冲的阻塞队列,其主要用来在两个线程之间交换数据。因其数据时无缓冲的。即当生产者生产数据的速度大于消费者消费数据的速度,会导致生产者线程长时间阻塞,导致服务不可用。

4.1、Transferer接口

SynchronousQueue中通过Transferer对数据进行交换。SynchronousQueue的出队及入队方法基本都是调用Transferer#transfer()方法进行处理的。而Transferer实现有两种,一个是公平交换的,一个是非公平交换的。

abstract static class Transferer<E> {
    //e:交换的数据,当为null时,表示要获取数据;当不为null时,表示插入数据
    //timed:true:运行超时;false:不允许超时
    //nanos:超时时间
    abstract E transfer(E e, boolean timed, long nanos);
}

4.2、TransferStack交换堆

TransferStack是非公平的交换堆,其不是某个消费者或生产者等待的时间越久就越先获取交换权。

TransferStack主要实现源码:

static final class TransferStack<E> extends Transferer<E> {
    //模式,表示当前操作是消费数据还是生产数据
    //REQUEST:需要消费数据
    //DATA:需要生产数据
    //FULFILLING:表示一对生产者和消费者已经匹配上了
    static final int REQUEST    = 0;
    static final int DATA       = 1;
    static final int FULFILLING = 2;

    //判断节点状态是否为:FULFILLING
    static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

    //TransferStacks的节点数据结构
    static final class SNode {
        //堆中下个节点
        volatile SNode next;        // next node in stack
        //和当前节点匹配的节点
        volatile SNode match;       // the node matched to this
        //当前在节点等待的线程
        volatile Thread waiter;     // to control park/unpark
        //节点数据:null表示为消费者节点;否则为生产者节点
        Object item;                // data; or null for REQUESTs
        //当前节点模式:REQUEST或DATA或FULFILLING
        int mode;

        SNode(Object item) {
            this.item = item;
        }
            
        //cas设置打当前节点的next值   
        boolean casNext(SNode cmp, SNode val) {
            return cmp == next &&
                UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        //尝试将当前节点与s节点配对
        boolean tryMatch(SNode s) {
            //cas设置当前节点的math节点
            if (match == null &&
                UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                Thread w = waiter;
                //唤醒与当前节点配对的math节点线程
                if (w != null) {    // waiters need at most one unpark
                    waiter = null;
                    LockSupport.unpark(w);
                }
                return true;
            }
            return match == s;
        }
    }

    //交换堆的头结点
    volatile SNode head;

    //cas设置头结点
    boolean casHead(SNode h, SNode nh) {
        return h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
    }

    @SuppressWarnings("unchecked")
    E transfer(E e, boolean timed, long nanos) {
        
        //数据交换处理,循环以下三个处理,直到数据交换成功或超时
        //1、若头结点为空或头结点的模式与当前节点一样,即都是生产者或消费者,
        //则直接将当前节点插入堆的头结点,并等待其他和当前节点能匹配的节点,
        //匹配上就返回匹配的节点,若节点取消则返回null
        
        //2、若头结点的模式与当前节点模式互相补足,即能配对,即一个生产者一个消费者,
        //则向堆中头部插入当前节点,并设置当前节点状态为fulfilling,设置匹配节点的math节点,
        //然后同时弹出当前节点与其math的节点,还会唤醒与其math的节点。
        
        //3、若堆的头结点已经与其他节点配对,
        //则进行配对处理并将配对的两个节点弹出
        SNode s = null; // constructed/reused as needed
        int mode = (e == null) ? REQUEST : DATA;

        for (;;) {
            SNode h = head;
            //头结点为空或头节点与当前节点模式一致
            if (h == null || h.mode == mode) {  // empty or same-mode
                //若不需要超时,则直接将当前节点放入堆的头节点,或直接返回null
                if (timed && nanos <= 0) {      // can't wait
                    if (h != null && h.isCancelled())
                        casHead(h, h.next);     // pop cancelled node
                    else
                        return null;
                //需要超时,则将节点插入头部并等待超时        
                } else if (casHead(h, s = snode(s, e, h, mode))) {
                    SNode m = awaitFulfill(s, timed, nanos);
                    if (m == s) {               // wait was cancelled
                        clean(s);
                        return null;
                    }
                    //匹配成功,将两个节点弹出
                    if ((h = head) != null && h.next == s)
                        casHead(h, s.next);     // help s's fulfiller
                    return (E) ((mode == REQUEST) ? m.item : s.item);
                }
             //头结点未匹配上?   
            } else if (!isFulfilling(h.mode)) { // try to fulfill
                //头结点已取消?则更新头节点为下个节点
                if (h.isCancelled())            // already cancelled
                    casHead(h, h.next);         // pop and retry
                //更新头结点为当前节点,并设置当前节点模式为已配对模式
                else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                    for (;;) { // loop until matched or waiters disappear
                        SNode m = s.next;       // m is s's match
                        if (m == null) {        // all waiters are gone
                            casHead(s, null);   // pop fulfill node
                            s = null;           // use new node next time
                            break;              // restart main loop
                        }
                        //节点匹配
                        SNode mn = m.next;
                        if (m.tryMatch(s)) {
                            casHead(s, mn);     // pop both s and m
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        } else                  // lost match
                            s.casNext(m, mn);   // help unlink
                    }
                }
            } else {                            // help a fulfiller
                SNode m = h.next;               // m is h's match
                if (m == null)                  // waiter is gone
                    casHead(h, null);           // pop fulfilling node
                else {
                    SNode mn = m.next;
                    if (m.tryMatch(h))          // help match
                        casHead(h, mn);         // pop both h and m
                    else                        // lost match
                        h.casNext(m, mn);       // help unlink
                }
            }
        }
    }

    //在超时时间内等待匹配上节点
    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        //等待的截止时间点
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        //自旋次数
        int spins = (shouldSpin(s) ?
                     (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        for (;;) {
            if (w.isInterrupted())
                s.tryCancel();
            SNode m = s.match;
            if (m != null)
                return m;
            if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    s.tryCancel();
                    continue;
                }
            }
            //自旋一定次数后线程阻塞自己等待配对线程唤醒
            if (spins > 0)
                spins = shouldSpin(s) ? (spins-1) : 0;
            else if (s.waiter == null)
                s.waiter = w; // establish waiter so can park next iter
            else if (!timed)
                LockSupport.park(this);
            else if (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }

4.3、TransferQueue交换堆

TransferQueue是公平的交换堆,其等待越久的会先对其进行匹配操作,从而达到公平交换。

TransferQueue主要实现源码:

static final class TransferQueue<E> extends Transferer<E> {
    //交换器中节点数据结构
    static final class QNode {
        //下个节点
        volatile QNode next;          // next node in queue
        //节点数据
        volatile Object item;         // CAS'ed to or from null
        //在节点等待的线程
        volatile Thread waiter;       // to control park/unpark
        //当前节点是否为数据节点
        final boolean isData;
    }

    //头节点
    transient volatile QNode head;
    //尾节点
    transient volatile QNode tail;
    //已经取消的节点引用,可能这个节点还未从队列中移除
    transient volatile QNode cleanMe;

   //cas将nh设置为新的头节点
    void advanceHead(QNode h, QNode nh) {
        if (h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
            h.next = h; // forget old next
    }

    //cas将nt设置为新的尾节点
    void advanceTail(QNode t, QNode nt) {
        if (tail == t)
            UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
    }

    @SuppressWarnings("unchecked")
    E transfer(E e, boolean timed, long nanos) {
        
        //数据交换,处理流程同TransferStack大同小异
        //1、
        QNode s = null; // constructed/reused as needed
        boolean isData = (e != null);

        for (;;) {
            QNode t = tail;
            QNode h = head;
            if (t == null || h == null)         // saw uninitialized value
                continue;                       // spin

            if (h == t || t.isData == isData) { // empty or same-mode
                QNode tn = t.next;
                if (t != tail)                  // inconsistent read
                    continue;
                if (tn != null) {               // lagging tail
                    advanceTail(t, tn);
                    continue;
                }
                if (timed && nanos <= 0)        // can't wait
                    return null;
                if (s == null)
                    s = new QNode(e, isData);
                if (!t.casNext(null, s))        // failed to link in
                    continue;

                advanceTail(t, s);              // swing tail and wait
                Object x = awaitFulfill(s, e, timed, nanos);
                if (x == s) {                   // wait was cancelled
                    clean(t, s);
                    return null;
                }

                if (!s.isOffList()) {           // not already unlinked
                    advanceHead(t, s);          // unlink if head
                    if (x != null)              // and forget fields
                        s.item = s;
                    s.waiter = null;
                }
                return (x != null) ? (E)x : e;

            } else {                            // complementary-mode
                QNode m = h.next;               // node to fulfill
                if (t != tail || m == null || h != head)
                    continue;                   // inconsistent read

                Object x = m.item;
                if (isData == (x != null) ||    // m already fulfilled
                    x == m ||                   // m cancelled
                    !m.casItem(x, e)) {         // lost CAS
                    advanceHead(h, m);          // dequeue and retry
                    continue;
                }

                advanceHead(h, m);              // successfully fulfilled
                LockSupport.unpark(m.waiter);
                return (x != null) ? (E)x : e;
            }
        }
    }

    Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
        /* Same idea as TransferStack.awaitFulfill */
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        int spins = ((head.next == s) ?
                     (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        for (;;) {
            if (w.isInterrupted())
                s.tryCancel(e);
            Object x = s.item;
            if (x != e)
                return x;
            if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    s.tryCancel(e);
                    continue;
                }
            }
            if (spins > 0)
                --spins;
            else if (s.waiter == null)
                s.waiter = w;
            else if (!timed)
                LockSupport.park(this);
            else if (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }
}

5、LinkedTransferQueue

LinkedTransferQueue是基于单链表实现的阻塞队列,节点数据入队时可以有几种模式,如AYNC、SYNC、TIMED、NOW等。不同模式应用于不同的场景。LinkedTransferQueue相较于LinkedBlockingQueue,可以直接将数据交给等待的节点,而无需入队,性能更高;LinkedTransferQueue相较于SynchronousQueue则多了存储元素的缓冲队列,可实现非阻塞的数据入队操作,避免了SynchronousQueue因消费者慢于生产者而导致的生产者阻塞问题。

5.1、主要属性

//cpu数是否大于1
private static final boolean MP =
    Runtime.getRuntime().availableProcessors() > 1;

//在阻塞等待之前自旋的次数
private static final int FRONT_SPINS   = 1 << 7;

//节点被其他节点自旋操作的次数
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;

//sweepVotes的阀值
static final int SWEEP_THRESHOLD = 32;

//头结点
transient volatile Node head;

//尾节点
private transient volatile Node tail;

//累计到一定次数再清除无效node
private transient volatile int sweepVotes;

//节点状态
//实时模式,立即进行数据交换
private static final int NOW   = 0; // for untimed poll, tryTransfer
//异步模式,若数据交换不成功,则将数据入队
private static final int ASYNC = 1; // for offer, put, add
//同步模式,会一直等到交换成功
private static final int SYNC  = 2; // for transfer, take
//超时模式,等待一定的超时时间后才失败
private static final int TIMED = 3; // for timed poll, tryTransfer


//等待的节点数据结构
static final class Node {
    //是否为数据节点
    final boolean isData;   // false if this is a request node
    //节点数据
    volatile Object item;   // initially non-null if isData; CASed to match
    //下个等待的节点
    volatile Node next;
    //在当前节点等待的线程
    volatile Thread waiter; // null until waiting

    //cas设置下个节点
    final boolean casNext(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    //cas设置节点数据
    final boolean casItem(Object cmp, Object val) {
        // assert cmp == null || cmp.getClass() != Node.class;
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

5.2、数据交换操作

private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race
        //
        for (Node h = head, p = h; p != null;) { // find & match first node
            boolean isData = p.isData;
            Object item = p.item;
            //p节点未被匹配到?
            if (item != p && (item != null) == isData) { // unmatched
                //节点模式不一样,无法匹配,直接跳出循环
                if (isData == haveData)   // can't match
                    break;
                //模式不一样,则cas尝试匹配    
                if (p.casItem(item, e)) { // match
                    //匹配成功,则cas将下个节点从队列中移除
                    for (Node q = p; q != h;) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        //无下个节点了?
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    //唤醒配对的节点
                    LockSupport.unpark(p.waiter);
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }
        //模式为非立即匹配?
        if (how != NOW) {                 // No matches available
            if (s == null)
                s = new Node(e, haveData);
            //尝试入队    
            Node pred = tryAppend(s, haveData);
            if (pred == null)
                continue retry;           // lost race vs opposite mode
            //同步操作?则等待匹配    
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

//将节点入队,放入队尾
private Node tryAppend(Node s, boolean haveData) {
    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail
        //队列为空,直接入队
        if (p == null && (p = head) == null) {
            if (casHead(null, s))
                return s;                 // initialize
        }
        //当前节点与队尾几点模式不同,无法入队?
        else if (p.cannotPrecede(haveData))
            return null;                  // lost race vs opposite mode
        //非队尾?    
        else if ((n = p.next) != null)    // not last; keep traversing
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                (p != n) ? n : null;      // restart if off list
        //cas插入队尾        
        else if (!p.casNext(null, s))
            p = p.next;                   // re-read on CAS failure
        else {
            //将数据添加到队尾
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                       (t = tail)   != null &&
                       (s = t.next) != null && // advance and retry
                       (s = s.next) != null && s != t);
            }
            return p;
        }
    }
}

//等待数据节点匹配,直到超时
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = -1; // initialized after first item and cancel checks
    ThreadLocalRandom randomYields = null; // bound if needed

    for (;;) {
        Object item = s.item;
        // 如果s元素的值不等于e,说明它被匹配到了
        if (item != e) {                  // matched
            // assert item != s;
            s.forgetContents();           // avoid garbage
            return LinkedTransferQueue.<E>cast(item);
        }
        // 如果当前线程中断了,或者有超时的到期了
        // 就更新s的元素值指向s本身
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // cancel
            unsplice(pred, s);
            return e;
        }
        // 如果自旋次数小于0,就计算自旋次数
        if (spins < 0) {                  // establish spins at/near front
            if ((spins = spinsFor(pred, s.isData)) > 0)
                randomYields = ThreadLocalRandom.current();
        }
        //自旋处理
        else if (spins > 0) {             // spin
            --spins;
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();           // occasionally yield
        }
        else if (s.waiter == null) {
            s.waiter = w;                 // request unpark then recheck
        }
        //需要超时等待,则阻塞当前线程等待
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        }
        else {
            LockSupport.park(this);
        }
    }
}

5.3、入队及出队操作

入队出队(put、offer、add,take、poll等)操作,基本都是调用xfer进行数据交换操作,只是不同操作模式不同。

public void put(E e) {
    xfer(e, true, ASYNC, 0);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}


public boolean add(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}


public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = xfer(null, false, TIMED, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

public E poll() {
    return xfer(null, false, NOW, 0);
}


上一篇下一篇

猜你喜欢

热点阅读