任务队列实现心得
2018-12-07 本文已影响6人
草蜢的逆袭
近日,研究了一下任务队列,于是想到了阻塞队列BlockingQueue,不得不提起到它的两个方法,put,take这两个方法都是阻塞式的,当队列满时,put方法阻塞,当队列空时,take方法阻塞.
我们使用BlockingQueue,只是完成了基本功能FIFO,就是任务先进先出。我们想要实现任务LIFO,这种方式就不行了。需要使用到BlockingDque,双端队列,在队列的两端都可以插入和获取元素,还有阻塞的这一特性。所以我们在使用中,使用的是BlockingDque,来实现我们的需求。
非阻塞队列-FIFO
public class NoneBlockingQTesterFIFO implements ITester {
private static int MAX_SIZE = 10;
private static LinkedList<Integer> queue = new LinkedList<>();
private Consumer consumer;
private Producer producer;
private static Random random = new Random();
public NoneBlockingQTesterFIFO() {
consumer = new Consumer();
producer = new Producer();
}
@Override
public void test() {
producer.start();
consumer.start();
}
public static boolean isFull() {
return queue.size() == MAX_SIZE;
}
public static boolean isEmpty() {
return queue.size() == 0;
}
static class Producer extends Thread {
int c = 0;
@Override
public void run() {
System.out.println("Producer run consume ...");
produce();
}
private void produce() {
System.out.println("Producer run produce ...");
while (c <= 20) {
synchronized (queue) {
while (queue.size() == MAX_SIZE) {
try {
System.out.println("队列满了...");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
// 通知其它线程取数据
queue.notify();
}
}
System.out.println("");
// 每次插入一个元素
int v = random.nextInt(100);
queue.offer(v);
c++;
System.out.println("添加了数据: " + v);
queue.notify();
// System.out.println("向队列中添加了一个元素,剩余空间 = " + (MAX_SIZE - queue.size()));
}
}
}
}
static class Consumer extends Thread {
@Override
public void run() {
System.out.println("Consumer run...");
consume();
}
private void consume() {
System.out.println("Consumer run consume ...");
while (true) {
synchronized (queue) {
while (queue.size() == 0) {
try {
System.out.println("队列空了...");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
System.out.println("");
// 每次移走队首元素
Integer v = queue.poll();
System.out.println("取到了数据 " + v);
queue.notify();
// System.out.println("从队列中取走了一个元素,剩余空间 = " + (MAX_SIZE - queue.size()));
}
}
}
}
/**
添加了数据: 45
添加了数据: 58
添加了数据: 82
添加了数据: 20
添加了数据: 29
添加了数据: 18
添加了数据: 39
添加了数据: 83
添加了数据: 82
添加了数据: 41
队列满了...
取到了数据 45
取到了数据 58
取到了数据 82
取到了数据 20
取到了数据 29
取到了数据 18
取到了数据 39
取到了数据 83
取到了数据 82
取到了数据 41
队列空了...
添加了数据: 51
添加了数据: 91
添加了数据: 76
添加了数据: 46
添加了数据: 34
添加了数据: 38
添加了数据: 19
添加了数据: 36
添加了数据: 86
添加了数据: 14
队列满了...
取到了数据 51
取到了数据 91
取到了数据 76
取到了数据 46
取到了数据 34
取到了数据 38
取到了数据 19
取到了数据 36
取到了数据 86
取到了数据 14
*/
}
非阻塞队列的-LIFO
public class NoneBlockingQTesterLIFO implements ITester {
private static int MAX_SIZE = 10;
private static LinkedList<Integer> queue = new LinkedList<>();
private Consumer consumer;
private Producer producer;
private static Random random = new Random();
public NoneBlockingQTesterLIFO() {
consumer = new Consumer();
producer = new Producer();
}
@Override
public void test() {
producer.start();
consumer.start();
}
public static boolean isFull() {
return queue.size() == MAX_SIZE;
}
public static boolean isEmpty() {
return queue.size() == 0;
}
static class Producer extends Thread {
int c = 0;
@Override
public void run() {
System.out.println("Producer run consume ...");
produce();
}
private void produce() {
System.out.println("Producer run produce ...");
while (c <= 20) {
synchronized (queue) {
while (queue.size() == MAX_SIZE) {
try {
System.out.println("队列满了...");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
// 通知其它线程取数据
queue.notify();
}
}
System.out.println("");
// 每次插入一个元素
int v = random.nextInt(100);
queue.offerFirst(v);
c++;
System.out.println("添加了数据: " + v);
queue.notify();
// System.out.println("向队列中添加了一个元素,剩余空间 = " + (MAX_SIZE - queue.size()));
}
}
}
}
static class Consumer extends Thread {
@Override
public void run() {
System.out.println("Consumer run...");
consume();
}
private void consume() {
System.out.println("Consumer run consume ...");
while (true) {
synchronized (queue) {
while (queue.size() == 0) {
try {
System.out.println("队列空了...");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
System.out.println("");
// 每次移走队首元素
Integer v = queue.pollFirst();
System.out.println("取到了数据 " + v);
queue.notify();
// System.out.println("从队列中取走了一个元素,剩余空间 = " + (MAX_SIZE - queue.size()));
}
}
}
}
/**
*添加了数据: 39
添加了数据: 80
添加了数据: 58
添加了数据: 87
添加了数据: 73
添加了数据: 94
添加了数据: 0
添加了数据: 52
添加了数据: 23
添加了数据: 59
队列满了...
取到了数据 59
取到了数据 23
取到了数据 52
取到了数据 0
取到了数据 94
取到了数据 73
取到了数据 87
取到了数据 58
取到了数据 80
取到了数据 39
队列空了...
添加了数据: 38
添加了数据: 42
添加了数据: 87
添加了数据: 53
添加了数据: 33
添加了数据: 32
添加了数据: 68
添加了数据: 13
添加了数据: 59
添加了数据: 43
队列满了...
取到了数据 43
取到了数据 59
取到了数据 13
取到了数据 68
取到了数据 32
取到了数据 33
取到了数据 53
取到了数据 87
取到了数据 42
取到了数据 38
队列空了...
添加了数据: 39
取到了数据 39
队列空了...
*/
}
阻塞队列之双端队列的使用
public class LinkedBlockingDqTester implements Itester {
int MAX_ITEM_SIZE = 10;
private LinkedBlockingDeque<Integer> queue = new LinkedBlockingDeque<>(MAX_ITEM_SIZE);
private int mFlag;
public LinkedBlockingDqTester(int flag) {
this.mFlag = flag;
}
@Override
public void test() {
switch (mFlag) {
case 0:
fifo();
break;
case 1:
lifo();
break;
}
}
// put -> [9,8,7,6,5,4,3,2,1,0]
// get -> [9]
// [8,7,6,5,4,3,2,1,0]
// put -> [10,8,7,6,5,4,3,2,1,0]
// get -> [10]
// [8,7,6,5,4,3,2,1,0]
// put -> [11,8,7,6,5,4,3,2,1,0]
// get -> [11]
// [8,7,6,5,4,3,2,1,0]
// put -> [12,8,7,6,5,4,3,2,1,0]
// get -> [12]
// [8,7,6,5,4,3,2,1,0]
// put -> [13,8,7,6,5,4,3,2,1,0]
// get -> [13]
// [8,7,6,5,4,3,2,1,0]
// put -> [14,8,7,6,5,4,3,2,1,0]
// get -> [14]
// [8,7,6,5,4,3,2,1,0]
// get -> [8]
// get -> [7]
// get -> [6]
// get -> [5]
// get -> [4]
// get -> [3]
// get -> [2]
// get -> [1]
// get -> [0]
/**
* put -> putFirst
* take -> takeFirst
*/
private void lifo() {
new Thread() {
public void run() {
for (int i = 0; i < 15; i++) {
try {
// 解决避免线程执行过快,带来的问题
System.out.println("");
queue.putFirst(i);
System.out.println("存数据 " + i);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(e);
}
}
};
}.start();
new Thread() {
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
while (true) {
try {
// 解决避免线程执行过快,带来的问题
System.out.println("");
Integer v = queue.takeFirst();
System.out.println("取数据 = " + v);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(e);
}
}
};
}.start();
}
// put(putLast) -> [0,1,2,3,4,5,6,7,8,9]
// get(getFirst) -> [0,1,2,3]
// [4,5,6,7,8,9]
// put(putLast) -> [4,5,6,7,8,9,10]
// [4,5,6,7,8,9,10]
// get(getFirst) -> [4,5,6,7]
// [8,9,10]
// put(putLast) -> [8,9,10,11]
// get(getFirst) -> [8,9]
// [10,11]
// put(putLast) -> [10,11,12,13]
// [10,11,12,13]
// get(getFirst) -> [10]
// [11,12,13]
// put(putLast) -> [11,12,13,14]
// get(getFirst) -> [11]
// get(getFirst) -> [12]
// get(getFirst) -> [13]
// get(getFirst) -> [14]
/**
* put -> putLast
* take -> takeFirst
*/
private void fifo() {
new Thread() {
public void run() {
for (int i = 0; i < 15; i++) {
try {
queue.put(i);
System.out.println("存数据 " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}.start();
new Thread() {
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println(queue.toArray());
while (true) {
try {
Integer v = queue.take();
System.out.println("取数据 = " + v);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}.start();
}
}
重点说明
本项目中的代码,有部分是参照网络上的资料进行的修改,如有雷同,敬请谅解。