并发容器BlockingQueue - LinkedTransf
2019-07-13 本文已影响0人
王侦
1.测试代码
public static void main(String[] args) {
final LinkedTransferQueue<Long> queue = new LinkedTransferQueue<Long>();
Runnable offerTask = new Runnable(){
@Override
public void run(){
queue.offer(8L);
System.out.println("offerTask thread has gone!");
}
};
Runnable takeTask = new Runnable(){
@Override
public void run(){
try {
System.out.println(Thread.currentThread().getId() + " " +queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable takeTaskInterrupted = new Runnable(){
@Override
public void run(){
Thread.currentThread().interrupt();
try {
System.out.println(Thread.currentThread().getId() + " " +queue.take());
} catch (InterruptedException e) {
System.out.println(e + " "+Thread.currentThread().getId());
}
}
};
new Thread(takeTask).start();
new Thread(takeTaskInterrupted).start();
new Thread(offerTask).start();
new Thread(takeTask).start();
}
调试步骤:
-
step1.四个线程同时运行到xfer()方法的Node pred = tryAppend(s, haveData);处
- step2.Thread-0 take()运行,Thread-1 takeTaskInterrupted运行,Thread-2 offer()运行(Intellij IDEA debug F9),结果如下:
java.lang.InterruptedException 14
offerTask thread has gone!
- step3.Thread-3 take()运行
找到队列的last结点,发现与last结点模式不一致且last结点未匹配,则会重试。
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry;
else if (p.cannotPrecede(haveData))
return null;
/**
* Returns true if a node with the given mode cannot be
* appended to this node because this node is unmatched and
* has opposite data mode.
*/
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
重试的情况,发现与头结点的模式一致,则尝试添加到链表尾,又会重现上一幕,结果就是死循环。
retry:
for (;;) { // restart on append race
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
if (item != p && (item != null) == isData) { // unmatched
if (isData == haveData) // can't match
break;
2.为什么put会入队?
这里有一个前提,所有线程都停留在tryAppend(s, haveData);处,所以当Thread-0 take()和Thread-1 takeTaskInterrupted运行完毕后,Thread-2 offer()是从tryAppend开始运行的,跳过了xfer()方法前面对于队头的匹配操作。
但是offer和take模式不一致,为什么会入队?
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
还是p.cannotPrecede(haveData)这个地方检测的问题,其中的(x = item) != this,由于上一个take()被中断了,该结点是处于CANCEL状态,其item指向自己。 导致这里即使模式不一致,也会入队,出现错误。
那么本质原因就是:这里只有cannotPrecede()判别是不正确的,需要再加一个判断,即遇到CANCEL结点,应该retry。