JDK源码解析

JDK1.8 SynchronousQueue源码解析

2020-04-27  本文已影响0人  i砖工

同步队列:它继承了一般的AbstractQueue和实现了BlockingQueue接口。它与其它的BlockingQueue最大的区别就在它不存储任何数据,它的内部是一个栈(非公平)或者一个队列(公平策略)用来存储访问SynchronousQueue的线程,而访问它的线程有消费者和生产者,对应于方法put和take。当一个生产者或者消费者试图访问SynchronousQueue的时候,如果找不到与之能够配对的消费者或者生产者,则当前线程会阻塞,直到对应的线程将其唤醒,或者等待超时,或者中断。
以下是它的put和take方法的实现,不管是put还是take,其核心都是调用transferer对象的transfer方法,所以要弄明白SynchronousQueue,就需要弄清楚Transferer。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

我们先从整体上来看一下SynchronousQueue的内部结构:


类结构图

从类图上可以看出,SynchronousQueue内部引用了一个Transfer,而Transfer的实现有两种,一个是TransferStack,一个是TransferQueue.
今天我们分析的重点对象是TransferStack.
1.先来看下TransferStack是如何初始化的:

//SynchronousQueue的构造方法
public SynchronousQueue(boolean fair) {
      //如果初始化为非公平策略,则使用TransferStack
      transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

2.仔细看一下TransferStack的实现:

static final class TransferStack<E> extends Transferer<E> {
        //代表本次transfer的操作是获取数据
        static final int REQUEST    = 0;
       //代笔本次transfer的操作是放入数据
        static final int DATA       = 1;
       //代表节点正在配对
        static final int FULFILLING = 2;
       //判断节点目前是否在匹配中,3&2 == 2(匹配中)  
       //2&2 == 2(匹配中)    1&2==0 , 0&2==0 (等待匹配)
        static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

        /** 内部类,栈节点*/
        static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // 配对的节点,如果未配对则未空
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;

            SNode(Object item) {
                this.item = item;
            }

            boolean casNext(SNode cmp, SNode val) {
                return cmp == next &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }

           /**
           尝试配对
           s:被配对的节点
           配对的逻辑为:通过CAS设置节点的match属性,如果能设置成功,则说明配对成功,配对成功后,再通过LockSupport.unpark(w);
将其对应的等待线程唤醒。
          这个方法比较简答,就不再累述
           **/
            boolean tryMatch(SNode s) {
                if (match == null &&
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    Thread w = waiter;
                    if (w != null) {    // waiters need at most one unpark
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                return match == s;
            }

            /**
             * Tries to cancel a wait by matching node to itself.
             如果一个节点等待超时,或者线程被中断,则需要取消节点的等待,而取消等待的标志就是将match指向自己
             */
            void tryCancel() {
                UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
            }
            boolean isCancelled() {
                return match == this;
            }

            // Unsafe mechanics
            //Unsafe机制, 不懂的就不要看juc包了,先搞懂Unsafe再看
            private static final sun.misc.Unsafe UNSAFE;
            private static final long matchOffset;
            private static final long nextOffset;

            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class<?> k = SNode.class;
                    matchOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("match"));
                    nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }

        /** The head (top) of the stack */
        //栈顶节点
        volatile SNode head;
       //重新CAS栈顶节点,一般都用在栈顶元素出栈的时候
        boolean casHead(SNode h, SNode nh) {
            return h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
        }

        //构造栈节点
        static SNode snode(SNode s, Object e, SNode next, int mode) {
            if (s == null) s = new SNode(e);
            s.mode = mode;
            s.next = next;
            return s;
        }

        //关键方法transfer, 通过e是否为空来判断本次操作是获取元素还是放入元素
        E transfer(E e, boolean timed, long nanos) {
            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;  //e为空, 表示该次请求为获取元素,e不为空则是生产放入元素
            for (;;) {
                SNode h = head;
                //①当头节点为空,则新加入节点直接入栈,然后进入等待,当头节点不为空,并且本次操作的模式和头节点模式一样,则继续进入栈等待,否则去到代码②
                if (h == null || h.mode == mode) {  // empty or same-mode       
                    if (timed && nanos <= 0) {      // can't wait //不需要等待
                        if (h != null && h.isCancelled()) //头节点已经中断或者超时,重新设置头节点
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null; //由于不能等待,而头节点与本次入队列的节点操作模式相同,所以直接返回空(如果可以等待的话会进入自旋)
                    } else if (casHead(h, s = snode(s, e, h, mode))) { //新节点入栈
                        SNode m = awaitFulfill(s, timed, nanos); //等待该节点被配对或者超时中断,详情见下面的方法具体解释。
                        if (m == s) {               // wait was cancelled
                            clean(s);  //清理从head节点到s.next节点之间的“取消”节点
                            return null;  //返回null,线程被中断或者节点等待超时,调用者(上层应用)能处理线程中断逻辑。
                        }
                
                         //s节点从awaitFulfill中出来,说明是配对成功节点将其唤醒,而唤醒的节点一定是栈顶节点(见代码②逻辑,因为每次只允许有一个节点进入配对状态,
                         //进入配对状态后,其它节点无法加入栈,只有等到配对成功,重新casHead后其它节点才能入栈,所以这里直接判断头节点)
                        if ((h = head) != null && h.next == s) //因为匹配线程那边也可能在调用casHead,所以这里h.next == s判断一下head是否已经被重置  
                            casHead(h, s.next);     // help s's fulfiller   
                        
                        return (E) ((mode == REQUEST) ? m.item : s.item); //如果是消费者则返回匹配上的m.item, 如果是生产者,则返回生产的节点的item
                     }
                }
                //②     
                //本次操作模式和头节点模式不相等,先检查头节点模式是否为匹配中。
                //头节点模式为 3或者2时为匹配中,这时任何新增的节点都无法入栈,因为任何新增操作的模式都不等于3或者2,所以在这次匹配完成之前,head节点是不会变化的。
                else if (!isFulfilling(h.mode)) { 
                    //头节点模式为:0或者1
                    //head节点已经取消,所以将head指向下一个节点,head出栈
                    if (h.isCancelled())            
                         casHead(h, h.next);         // pop and retry 
            
                    //②-1
                    //如果头节点模式为0或1,则本次操作的节点入栈,入栈的模式为:2或者3, ps(这个时候头节点判断isFullfilling则返回true),
                    //所以一旦有节点在匹配中,则其它新增节点都会直接去到代码③
                    //FULFILLING=2,  2|0 == 2, 2|1==3
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { 
            
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            //如果新的头节点后面没有等待节点了(等待节点有可能被中断或者超时出栈了),则清空栈,自旋重新开始
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            //②-2如果下一个节点不为空,则尝试匹配
                            SNode mn = m.next;
                            if (m.tryMatch(s)) { 
                                casHead(s, mn);     // pop both s and m //匹配成功,则弹出头节点和等待节点
                                //如果本次操作是获取数据,则返回匹配上的生产数据的item,相反, 如果本次操作是生产数据,则直接返回自己生产的数据。
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                //如果匹配失败(理论上一次只有一个节点能够进行匹配,见②-1,所以理论上不应该会匹配失败,但是为什么会失败呢?
                                //因为匹配的m还有可能超时或者被中断),所以如果匹配失败,弹出m节点。
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } 
                //③如果头节点不为空,并且本次操作的模式和头节点的模式不相同,并且头节点是正在匹配
                else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    //当②中新的匹配节点进入后,有可能等待节点超时或者被中断了,即waiter is gone
                    if (m == null)                  // waiter is gone
                    //删除匹配节点,栈清空重新开始,因为最外层有一个自旋,所以又会重新安置这个节点。
                    casHead(h, null);           // pop fulfilling node
                    else {
                        //这一段代码逻辑与②-2相同
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }

        //让节点进入等待(通常发生在节点入栈时发现没有对应的匹配者)
        //根据设置的是否等待超时条件,让当前线程进入等待配对的自旋逻辑中,只有当配对成功,或者线程中断才会退出。
        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())  //当前线程已经中断,则取消当前节点的配对等待
                    s.tryCancel();
                SNode m = s.match;
                if (m != null) //当前节点已经配对成功,或者取消(m==s)
                    return m;
                if (timed) { //有超时设置
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) { //已经超时
                        s.tryCancel(); //取消s节点的等待
                        continue;
                    }
                }
                //不超时(一直等待),或者设置超时时间后还没有超时。
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0; //减少循环次数
                else if (s.waiter == null) //不再自旋,设置线程
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed) //如果设置的是一直等待,则睡眠当前线程
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold) //如果是有时间的等待,但是如果还剩下小于1秒的时间,则不再park,直接继续自旋
                    LockSupport.parkNanos(this, nanos);
            }
        }


        //判断是否还需要继续自旋还是睡眠
        //这里要注意的是该方法只在awaitFulfill方法被调用,而awaitFulfill只在入栈元素和栈顶元素操作模式一致时(者说明栈顶元素不是匹配中状态,见代码②处的说明)
        boolean shouldSpin(SNode s) {
            SNode h = head;
            //这个逻辑尤其是h==null确实没看明白什么时候会出现h!=s然后h == null,因为s入栈时head会指向s,就算s在自旋过程中有新的head节点加入,那h也不会为null。 先打???
           //但是如果我来写这个方法的话, 我我会写一个shouldNotSpin,如下
            return (h == s || h == null || isFulfilling(h.mode));
        }
       //s不在栈顶,并且栈顶元素不在匹配中,则栈顶以后的元素可以暂时不用自旋,这样逻辑就很容易说得通
       //因为栈顶元素还没有进来匹配者,说明栈顶元素和s一样属于等待者(前者都还没被匹配,s自然不会被匹配,所以先睡一会儿)
        boolean shouldNotSpin(Snode s){
            Snode h = head;
            return (h!=null && h!=s && !isFulfilling(h.mode));
        }

        //清理节点出栈
        void clean(SNode s) {
            s.item = null;   // forget item
            s.waiter = null; // forget thread
            //这里为什么要尝试下一个,而不到下下一个????
            SNode past = s.next;
            if (past != null && past.isCancelled())
                past = past.next;
        
            // Absorb cancelled nodes at head
            //清理头节点到past节点之间,从头节点开始连续的“取消”状态的节点(主要是清理头节点)
            SNode p;
            while ((p = head) != null && p != past && p.isCancelled())
                casHead(p, p.next);
        
            // Unsplice embedded nodes
            //清理头节点到past节点之间,跳跃的被“取消”的节点(主要是清理头节点与past中间的节点)
            while (p != null && p != past) {
                SNode n = p.next;
                if (n != null && n.isCancelled())
                    p.casNext(n, n.next);
                else
                    p = n;
            }
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long headOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = TransferStack.class;
                headOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("head"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

核心逻辑就是以上的代码,只要记住:SynchronousQueue不存储实际的数据,它的栈或者队列中存储的是生产者和消费者节点,最开始进入栈的节点就成为了等待者(这里不管是生产还是消费),而后面进入的节点都需要根据操作的mode来判断是继续入栈等待还是入栈后立即进行匹配。

上一篇下一篇

猜你喜欢

热点阅读