大飞老师带你看线程(并发容器-SynchronousQueue)

2019-04-19  本文已影响0人  叩丁狼教育

本文作者:王一飞,叩丁狼高级讲师。原创文章,转载请注明出处。

接上一篇, 本篇讲SynchronousQueue队列非公平策略put与take操作

源码分析

2:非公平锁策略- put / take

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

put 跟 take 方法有都调用:
调用transfer 方法:
put : transferer.transfer(e, false, 0)
take: transferer.transfer(null, false, 0);
从前面内部结构看: SynchronousQueue 非公平策略底层实际上委托给TransferStack 栈实现, 而内部存储数据使用SNode 栈节点 来看下节点源码:

static final class SNode {
    volatile SNode next;        //下一个节点    
    volatile SNode match;       //相匹配的节点
    volatile Thread waiter;     //线程挂起与唤醒控制: park /unpark      
    Object item;                //节点内容    
    //模式控制变量
    //非公平策略模式有3种:
    //REQUEST:表示消费数据-take操作
    //DATA:表示生产数据-put操作
    //FULFILLING:介于上2个状态间
        //多一个状态原因:TransferStack 配对操作原理是:每个线程配对时,都会先加入到栈顶中, 不管是take还是put,如果此时发现入栈的线程跟栈内线程可以配对,那么此时线程可以使用FULFILLING状态类标记:将要执行配对逻辑线程, 提示其他配对线程,配对时跳过这个节点,匹配其他的线程。
    int mode;

    //cas原子操作, 设置next节点
    boolean casNext(SNode cmp, SNode val) {...}
    //节点匹配,匹配成功,则unpark等待线程
    boolean tryMatch(SNode s) {...}
    //取消节点, 将节点内容设置为自己
    void tryCancel() {...}
    //判断是否操作结束
    boolean isCancelled() {...}
}

此处研究的是公平锁策略, 所以, 此时的transfer变量执项的是: TransferStack 类的实例

E transfer(E e, boolean timed, long nanos) {
    SNode s = null; 
    //根据transfer方法参数e判断当前操作模式
    //有值DATA 反之为REQUEST 
    int mode = (e == null) ? REQUEST : DATA;
    for (;;) {
        SNode h = head;  //初始时head为null
        //第一次put或take h==null成立, 如果已经存在挂起线程,入栈, 不管take或put mode必须一致才可以进入,此分支是入栈分支,不会进行配对
        if (h == null || h.mode == mode) {  
            if (timed && nanos <= 0) {  //超时操作   
                //队列中有配对线程,但是配对线程被取消了
                if (h != null && h.isCancelled()) 
                    casHead(h, h.next);   //下移动栈顶节点,出栈
                else
                    return null;
             //满足并进入此分支, 创建新节点,当前线程入栈成为栈顶节点
            } else if (casHead(h, s = snode(s, e, h, mode))) {
             //一旦栈顶位置移动成功, 挂起当前线程,等待配对线程
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {             
                    clean(s);  //一旦配对成功, 对应的线程出栈(被清除)
                    return null;
                }
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     
                return (E) ((mode == REQUEST) ? m.item : s.item); //返回结果
            }
        //出现mode不同操作时,那么就需要配对了,此时需要满足单前栈顶节点不是正在配对节点, 所以做了排除其他也企图对栈顶节点配对的竞争线程操作
        } else if (!isFulfilling(h.mode)) { 
            if (h.isCancelled())  //检查,如果栈顶节点配其他线程配对成功,栈顶节点下移
                casHead(h, h.next);   
            //如果没有竞争线程, 线程入栈成为栈顶节点, 同时加正在配对状态
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) {
                    //获取目标配对的节点
                    SNode m = s.next;    
                    //如果目标配对节点为null,  则表示目标配对节点被其他线程抢走了,
                    if (m == null) {      
                        casHead(s, null);   //从新移动栈顶节点为null
                        s = null;     //配对失败, 跳出循环, 从新循环判断     
                        break;              
                    }
                    //如果不为null, 获取目标配对节点下一个节点
                    SNode mn = m.next;
                    if (m.tryMatch(s)) { //尝试配对
                        casHead(s, mn);  //配对成功,移动栈顶节点到mn, 同时配对的2个节点同时出栈, 返回结果  
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                
                        //如果配对失败,寻找原目标节点下一个节点,重新循环配对
                        s.casNext(m, mn);  
                }
            }
        } else {  //进入这个分支,表示目标配对的节点是一个正在配对的节点
            //直接跳过,配对下一个节点 , 接下逻辑跟上面配对一样                     
            SNode m = h.next;            
            if (m == null)               
                casHead(h, null);         
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))         
                    casHead(h, mn);        
                else                        
                    h.casNext(m, mn);       
            }
        }
    }
}

非公平策略操作流程图:


添加元素
获取元素

这里需要说明, 如果元素过多,并发时,配对的线程具体要配对谁,那么就随机啦,无法按照图中理想的配对顺序。这也是为什么说是非公平策略啦。

总结:
结合代码: transferer执行流程可归纳为以下:
1: transferer调用transfer方法实现SynchronousQueue 公平队列的take跟put操作
2:不管是take或者put 线程只要跟栈顶节点模式一样,都要入栈,然后通过自旋方式寻找匹配,找不到配对挂起。如果时间消耗完毕,或者被取消,直接出列。
3:如果找到配对的节点,将当前线程打上FULFILLING标记,尝试配对。如果是此时有竞争配对线程,检查到该节点有FULFILLING标记,直接跳过,寻找下一个配对节点。
4:如果配对成功, 弹出当前配对成功2个节点。如果配对失败,从头循环。

想获取更多技术干货,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html

上一篇下一篇

猜你喜欢

热点阅读