Java并发编程笔记(八):线程池的使用
线程池是提高线程复用的技术手段,熟悉使用线程池可以提高应用程序的并发性能,但有些配置反而导致性能的降低,故我们也应该需要了解线程池的相关配置以避免这种类型的问题。
一、在任务与执行策略之间的隐性耦合
Executor框架可以将提交任务和任务的执行策略解耦开来。但不是所有的任务都适合所有的执行策略,有些任务是需要明确指定其执行策略的,包括:
- 依赖性任务。行为正确的任务大多是独立的,但不可避免的会出现一些依赖性任务,这些任务依赖其他的任务,此时必须小心的维持执行策略以避免活跃性问题。
- 使用线程封闭机制的任务。这种类型的任务被封闭在执行该任务的线程中,如果此时转换到线程池的环境,那么可能会丢失线程安全性。
- 对时间敏感的任务。这类任务对时间要求是很严格的,如果执行策略选择不当,可能会造成任务提交和任务处理完成之间的时间差很大,导致用户体验大大降低。
- ThreadLocal任务。只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用ThreadLocal任务才是有意义的,而在线程池中的线程池不应该使用ThreadLocal在任务之间传递值。
......
只有当任务都是通类型且相互独立的时,线程池的性能才能达到最佳,否则将可能造成“阻塞”,如果提交的任务依赖与其他任务,除非线程池无限大,否则将可能造成死锁。
线程饥饿死锁
在线程池中,如果一个任务依赖其他的任务,除非线程池无限大(这几乎是不可能的),否则将有可能造成死锁。例如在单线程的Executor中,如果一个任务将另一个任务提交到同一个Executor中,并等待这个被提交任务的执行结果,那么通常将引发死锁。因为线程只有一个,即使提交了任务,因为当先执行的任务正在等待这个刚刚提交的任务,所以提交的任务没有线程来执行,最终导致当前执行的任务永远无法获取到子任务的结果造成死锁。这种情况被称为“饥饿死锁”
运行时间较长的任务
如果一个任务运行时间较长,即使不会造成死锁,也会对应用程序的响应性造成不良影响。有一项技术可以缓解执行时间较长的任务造成的影响,即限定任务等待资源的时间,而不要无限制的等待。例如JAVA NIO里的Selector.select()方法有一个指定超时时间的版本,当超时事件发生时,select从阻塞中醒来,执行后面的逻辑。Netty利用这一点解决了NIO中臭名昭著的CPU空转问题。
二、设置线程池的大小
线程池的大小不是那么容易确定的,一般需要根据系统的环境,资源多少,有多少个CPU,多少内存,计算任务是CPU密集的,还是IO密集的等等条件来动态设置。过大或者过小都会造成资源的浪费或者过度竞争。
对于计算密集型的任务,在用用N个CPU的机器上,当线程池的大小为N+1时,一般能实现最优的利用率。而对于包含IO操作的任务来说,需要比N+1更多的线程,这是因为线程不会一直执行任务。对于IO密集型的任务来说,更加看重的时IO资源而不是CPU资源,故线程池的大小可以稍微根据系统要求调整。
三、配置ThreadPoolExecutor
Executors里的工厂方法会返回ThreadPoolExecutor实例,ThreadPoolExecutor是一个灵活的,稳定的线程池,允许进行各种定制。
如果默认的执行策略不满足需求,可以根据自己的需求对其进行定制,一般是通过此构造函数来定制一些参数,例如线程池大小,拒绝策略等。如下代码即ThreadPoolExecutor的通用构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:线程池的基本大小,即没有任务执行时线程池的大小,只有在工作队列满的情况下才会创建超出这个值的线程。
- maximumPoolSize: 线程池的最大大小,表示线程池允许持有线程的最大数量。结合corePoolSize,可以保证线程池的大小在[corePoolSize,maximumPoolSize]区间内。
- keepAliveTime。线程的存活时间,如果某个线程的空闲时间大于存活时间,那么将被标记为可回收的,并且当线程池的当前大小大于基本大小时,这个线程将被终止。
调节以上三个数值,可以使得应用程序拥有一定的伸缩性,提高程序的整体性能。但设置不当,可能会导致资源浪费或者过度竞争等。
- workQueue: 即工作队列,具体来说是BlockingQueue的实现类,例如SynchronousQueue,LinkedBlockingQueue等,具体使用哪个实现类需要根据需求决定。
- threadFactory:创建线程的工厂,ThreadFactory是一个接口,接口只有一个方法newThread(),实现这个接口并在newThread方法里实现创建线程的逻辑即可。故这个参数也是可以自定义的。
- handler :即拒绝执行处理器。当工作队列满或者线程数量达到最大线程数量时,新提交的任务会被拒绝,RejectedExecutionHandler 中的rejectedExecution()方法会被调用。拒绝策略在JDK中有一些实现,最简单的AbortPolicy会直接抛出RejectedExecutionException异常,其他的实现类可以查看源码,不再赘述。
在调用构造函数之后再定制ThreadPoolExecutor
我们可以先调用ThreadPoolExecutor的构造函数获得一个实例后,再调用一些set方法设置参数。ThreadPoolExecutor提供了如下图所示的set方法:
set方法
我个人认为先调用构造函数再定制ThreadPoolExecutor的最大意义是方便动态的调整参数,以提高并发应用程序的灵活性。
四、扩展ThreadPoolExecutor
ThreadPoolExecutor是可扩展的,查看源码可以发现其beforeExecute,afterExecute和terminated方法的方法体是空的,故其子类完全可以重写这些方法来实现一些功能,例如统计日志等,这有点类似于AOP技术(目前看来,仅仅最终功能是差不多,但是AOP主要是使用代理来实现的,会更加复杂一些)。
五、递归算法的并行化
递归算法代码通常比较简洁,但它其实是比较依赖上一步计算结果的,这一特性使得直接将单线程转换成多线程不是一个好主意。一般情况下,递归算法每一层都可以看做是一个小任务,整个递归可以看做是一个大任务,如果我们能对大任务进行合理的切分,那么就可以使用Fork-Join类似的框架来处理小任务,合并小任务的结果,从而得到最终结果。
例如,现在我们需要统计某个目录下的文件数量,直接写一个递归程序会非常简单,只需要判断当前的文件类型是否是目录,如果是目录,递归的调用该方法,否则将数量加1,最后返回数量即可。这是单线程的解法,如果是多线程的话,我个人会选择Fork-join框架来做这件事。Demo如下所示:
递归单线程查找给定包名下的所有.class文件
private static void findClassesByNormal(String packageName, String filePath, Set<Class<?>> classSet) {
File dir = new File(filePath);
//如果该文件不存在或者不是目录,那么直接返回
if (!dir.exists() || !dir.isDirectory()) {
return;
}
//列出当前目录的所有符合要求的项
File[] dirFiles = dir.listFiles(pathName -> pathName.isDirectory() || pathName.getName().endsWith(".class"));
if (dirFiles == null || dirFiles.length == 0) {
return;
}
String className;
Class clz;
for (File file : dirFiles) {
if (file.isDirectory()) {
//如果是目录,就递归进入目录
findClassesByNormal(packageName + "." + file.getName(), filePath + "/" + file.getName(), classSet);
continue;
}
//否则直接加载类文件
className = file.getName().substring(0, file.getName().length() - 6);
clz = loadClass(packageName + "." + className);
classSet.add(clz);
}
}
使用fork-join来实现上述功能:
private static void findClassesByForkJoin(String packageName, String filePath, Set<Class<?>> classSet) {
//创建一个任务
FindFileTask task = new FindFileTask(new File(filePath), packageName);
ForkJoinPool pool = new ForkJoinPool();
//提交一个任务
Future<Set<Class<?>>> future = pool.submit(task);
try {
//将结果加入到classSet总集合中
classSet.addAll(future.get());
} catch (InterruptedException | ExecutionException e) {
log.error(e.toString());
}
}
private static class FindFileTask extends RecursiveTask<Set<Class<?>>> {
private File file; //文件
private String packageName; //包名
public FindFileTask(File file, String packageName) {
this.file = file;
this.packageName = packageName;
}
@Override
protected Set<Class<?>> compute() {
Set<Class<?>> classSet = new HashSet<>();
File[] files = file.listFiles(pathName -> pathName.isDirectory() || pathName.getName().endsWith(".class"));
if (files == null) {
return classSet;
}
String className;
Class<?> clz;
for (File file : files) {
if (file.isDirectory()) {
//如果是目录,就创建子任去执行。
FindFileTask task = new FindFileTask(file, packageName + "." + file.getName());
task.fork(); //创建子任务
//将结果加入到classSet集合里(在这里就是将集合加入到总集合中)
classSet.addAll(task.join());
} else {
//加载类
className = file.getName().substring(0, file.getName().length() - 6);
clz = loadClass(packageName + "." + className);
//将加载好的类加入到集合
classSet.add(clz);
}
}
return classSet;
}
}
上述是在我的个人项目LmServer中的某个代码片段。主要功能就是查找指定包名下的.class文件,并加载到JVM中。注释写得很清楚了,不再赘述。
小结
对于并发执行的任务,Executor框架是一种强大且灵活的框架,它提供了大量可配置的选项,根据具体的场景和需求来定制这些配置,可以大大提高并发程序的性能,但配置不当可能会造成一些奇奇怪怪的问题,所以必须认真对待这些配置。