白话线程池(二):线程池的任务队列
线程池的任务对列参数workQueue,是一个阻塞对列,有以下三种通用策略
同步阻塞对列:SynchronousQueue
SynchronousQueue是 public interface BlockingQueue<E>
队列的一种实现。
BlockingQueue是一个阻塞对列
1 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;
2 也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入
SynchronousQueue
的同步,并不是指多线程并发直接的同步,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走
;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作
。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程.
实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列
借用网络上一篇文章的比喻就是
- 如果以洗盘子的比喻为例,那么这就相当于没有盘架,而是将洗好的盘子直接放入下一个空闲的烘干机中。这种实现队列的方式看似很奇怪,但由于可以直接交付工作,从而降低了将数据从生产者移动到消费者的延迟。(在传统的队列中,在一个工作单元可以交付之前,必须通过串行方式首先完成入列[Enqueue]或者出列[Dequeue]等操作。)
- 直接交付方式还会将更多关于任务状态的信息反馈给生产者。当交付被接受时,它就知道消费者已经得到了任务,而不是简单地把任务放入一个队列——这种区别就好比将文件直接交给同事,还是将文件放到她的邮箱中并希望她能尽快拿到文件。
- 因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列
请看示例代码
package ke.com;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
/**
* Created by zl on 2019/5/10.
*/
public class QueueTest {
public static void main(String[] args) {
final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>();
SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(synchronousQueue);
new Thread(queueProducer).start();
SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(synchronousQueue);
new Thread(queueConsumer1).start();
SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(synchronousQueue);
new Thread(queueConsumer2).start();
}
// 同步阻塞对列的生产者
static class SynchronousQueueProducer implements Runnable {
protected BlockingQueue<String> blockQueue;
final Random random = new Random();
public SynchronousQueueProducer(BlockingQueue<String> queue) {
this.blockQueue = queue;
}
@Override
public void run() {
while (true) {
String data = UUID.randomUUID().toString();
System.out.println("Put:" + data);
try {
blockQueue.put(data);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class SynchronousQueueConsumer implements Runnable {
protected BlockingQueue<String> blockQueue;
public SynchronousQueueConsumer(BlockingQueue<String> queue) {
this.blockQueue = queue;
}
@Override
public void run() {
while (true) {
try {
String data = blockQueue.take();
System.out.println(Thread.currentThread().getName() + " take(): " + data);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
运行上面的程序可以看到,插入数据的线程和读取数据的线程会交替进行

我们知道ThreadPoolExecutor内部实现任务提交的时候调用的是工作队列(BlockingQueue接口的实现类)的非阻塞式入队列方法(offer方法),因此,在使用SynchronousQueue(BlockingQueue接口的一种实现)作为工作队列的前提下,客户端代码向线程池提交任务时,而线程池中又没有空闲的线程能够从SynchronousQueue队列实例中取一个任务,那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。此时,ThreadPoolExecutor会新建一个新的线程用于对这个入队列失败的任务进行处理(但有一个前提是此时线程池的大小还未达到其最大线程池大小)。
所以,使用SynchronousQueue作为工作队列,工作队列本身并不限制待执行的任务的数量
。但此时需要限定线程池的最大大小为一个合理的有限值,而不是Integer.MAX_VALUE,否则可能导致线程池中的工作者线程的数量一直增加到系统资源所无法承受为止。
这个时候不妨考虑SynchronousQueue。SynchronousQueue实现上并不使用缓存空间。使用SynchronousQueue的目的就是保证“对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。