并发容器BlockingQueue - LinkedTransf

2019-07-13  本文已影响0人  王侦

1.测试代码

    public static void main(String[] args) {
        final LinkedTransferQueue<Long> queue = new LinkedTransferQueue<Long>();
        Runnable offerTask = new Runnable(){
            @Override
            public void run(){
                queue.offer(8L);
                System.out.println("offerTask thread has gone!");
            }
        };
        Runnable takeTask = new Runnable(){
            @Override
            public void run(){
                try {
                    System.out.println(Thread.currentThread().getId() + " " +queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        Runnable takeTaskInterrupted = new Runnable(){
            @Override
            public void run(){
                Thread.currentThread().interrupt();
                try {
                    System.out.println(Thread.currentThread().getId() + " " +queue.take());
                } catch (InterruptedException e) {
                    System.out.println(e + " "+Thread.currentThread().getId());
                }
            }
        };


        new Thread(takeTask).start();
        new Thread(takeTaskInterrupted).start();
        new Thread(offerTask).start();
        new Thread(takeTask).start();
    }

调试步骤:

java.lang.InterruptedException 14
offerTask thread has gone!

Node pred = tryAppend(s, haveData);
if (pred == null)
     continue retry;    
 else if (p.cannotPrecede(haveData))
                return null;  
        /**
         * Returns true if a node with the given mode cannot be
         * appended to this node because this node is unmatched and
         * has opposite data mode.
         */
        final boolean cannotPrecede(boolean haveData) {
            boolean d = isData;
            Object x;
            return d != haveData && (x = item) != this && (x != null) == d;
        }

重试的情况,发现与头结点的模式一致,则尝试添加到链表尾,又会重现上一幕,结果就是死循环。

        retry:
        for (;;) {                            // restart on append race

            for (Node h = head, p = h; p != null;) { // find & match first node
                boolean isData = p.isData;
                Object item = p.item;
                if (item != p && (item != null) == isData) { // unmatched
                    if (isData == haveData)   // can't match
                        break;

2.为什么put会入队?

这里有一个前提,所有线程都停留在tryAppend(s, haveData);处,所以当Thread-0 take()和Thread-1 takeTaskInterrupted运行完毕后,Thread-2 offer()是从tryAppend开始运行的,跳过了xfer()方法前面对于队头的匹配操作。

但是offer和take模式不一致,为什么会入队?

        final boolean cannotPrecede(boolean haveData) {
            boolean d = isData;
            Object x;
            return d != haveData && (x = item) != this && (x != null) == d;
        }

还是p.cannotPrecede(haveData)这个地方检测的问题,其中的(x = item) != this,由于上一个take()被中断了,该结点是处于CANCEL状态,其item指向自己。 导致这里即使模式不一致,也会入队,出现错误。

那么本质原因就是:这里只有cannotPrecede()判别是不正确的,需要再加一个判断,即遇到CANCEL结点,应该retry。

上一篇下一篇

猜你喜欢

热点阅读