Java 并发编程

Java并发编程笔记(八):线程池的使用

2018-07-07  本文已影响7人  yeonon

线程池是提高线程复用的技术手段,熟悉使用线程池可以提高应用程序的并发性能,但有些配置反而导致性能的降低,故我们也应该需要了解线程池的相关配置以避免这种类型的问题。

一、在任务与执行策略之间的隐性耦合

Executor框架可以将提交任务和任务的执行策略解耦开来。但不是所有的任务都适合所有的执行策略,有些任务是需要明确指定其执行策略的,包括:

只有当任务都是通类型且相互独立的时,线程池的性能才能达到最佳,否则将可能造成“阻塞”,如果提交的任务依赖与其他任务,除非线程池无限大,否则将可能造成死锁。

线程饥饿死锁

在线程池中,如果一个任务依赖其他的任务,除非线程池无限大(这几乎是不可能的),否则将有可能造成死锁。例如在单线程的Executor中,如果一个任务将另一个任务提交到同一个Executor中,并等待这个被提交任务的执行结果,那么通常将引发死锁。因为线程只有一个,即使提交了任务,因为当先执行的任务正在等待这个刚刚提交的任务,所以提交的任务没有线程来执行,最终导致当前执行的任务永远无法获取到子任务的结果造成死锁。这种情况被称为“饥饿死锁”

运行时间较长的任务

如果一个任务运行时间较长,即使不会造成死锁,也会对应用程序的响应性造成不良影响。有一项技术可以缓解执行时间较长的任务造成的影响,即限定任务等待资源的时间,而不要无限制的等待。例如JAVA NIO里的Selector.select()方法有一个指定超时时间的版本,当超时事件发生时,select从阻塞中醒来,执行后面的逻辑。Netty利用这一点解决了NIO中臭名昭著的CPU空转问题。

二、设置线程池的大小

线程池的大小不是那么容易确定的,一般需要根据系统的环境,资源多少,有多少个CPU,多少内存,计算任务是CPU密集的,还是IO密集的等等条件来动态设置。过大或者过小都会造成资源的浪费或者过度竞争。

对于计算密集型的任务,在用用N个CPU的机器上,当线程池的大小为N+1时,一般能实现最优的利用率。而对于包含IO操作的任务来说,需要比N+1更多的线程,这是因为线程不会一直执行任务。对于IO密集型的任务来说,更加看重的时IO资源而不是CPU资源,故线程池的大小可以稍微根据系统要求调整。

三、配置ThreadPoolExecutor

Executors里的工厂方法会返回ThreadPoolExecutor实例,ThreadPoolExecutor是一个灵活的,稳定的线程池,允许进行各种定制。

如果默认的执行策略不满足需求,可以根据自己的需求对其进行定制,一般是通过此构造函数来定制一些参数,例如线程池大小,拒绝策略等。如下代码即ThreadPoolExecutor的通用构造函数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

调节以上三个数值,可以使得应用程序拥有一定的伸缩性,提高程序的整体性能。但设置不当,可能会导致资源浪费或者过度竞争等。

在调用构造函数之后再定制ThreadPoolExecutor

我们可以先调用ThreadPoolExecutor的构造函数获得一个实例后,再调用一些set方法设置参数。ThreadPoolExecutor提供了如下图所示的set方法:


set方法

我个人认为先调用构造函数再定制ThreadPoolExecutor的最大意义是方便动态的调整参数,以提高并发应用程序的灵活性。

四、扩展ThreadPoolExecutor

ThreadPoolExecutor是可扩展的,查看源码可以发现其beforeExecute,afterExecute和terminated方法的方法体是空的,故其子类完全可以重写这些方法来实现一些功能,例如统计日志等,这有点类似于AOP技术(目前看来,仅仅最终功能是差不多,但是AOP主要是使用代理来实现的,会更加复杂一些)。

五、递归算法的并行化

递归算法代码通常比较简洁,但它其实是比较依赖上一步计算结果的,这一特性使得直接将单线程转换成多线程不是一个好主意。一般情况下,递归算法每一层都可以看做是一个小任务,整个递归可以看做是一个大任务,如果我们能对大任务进行合理的切分,那么就可以使用Fork-Join类似的框架来处理小任务,合并小任务的结果,从而得到最终结果。

例如,现在我们需要统计某个目录下的文件数量,直接写一个递归程序会非常简单,只需要判断当前的文件类型是否是目录,如果是目录,递归的调用该方法,否则将数量加1,最后返回数量即可。这是单线程的解法,如果是多线程的话,我个人会选择Fork-join框架来做这件事。Demo如下所示:

递归单线程查找给定包名下的所有.class文件

    private static void findClassesByNormal(String packageName, String filePath, Set<Class<?>> classSet) {
        File dir = new File(filePath);

        //如果该文件不存在或者不是目录,那么直接返回
        if (!dir.exists() || !dir.isDirectory()) {
            return;
        }

        //列出当前目录的所有符合要求的项
        File[] dirFiles = dir.listFiles(pathName -> pathName.isDirectory() || pathName.getName().endsWith(".class"));

        if (dirFiles == null || dirFiles.length == 0) {
            return;
        }


        String className;
        Class clz;

        for (File file : dirFiles) {
            if (file.isDirectory()) {
                //如果是目录,就递归进入目录
                findClassesByNormal(packageName + "." + file.getName(), filePath + "/" + file.getName(), classSet);
                continue;
            }

            //否则直接加载类文件
            className = file.getName().substring(0, file.getName().length() - 6);
            clz = loadClass(packageName + "." + className);

            classSet.add(clz);
        }

    }

使用fork-join来实现上述功能:

    private static void findClassesByForkJoin(String packageName, String filePath, Set<Class<?>> classSet) {
        //创建一个任务
        FindFileTask task = new FindFileTask(new File(filePath), packageName);

        ForkJoinPool pool = new ForkJoinPool();
        //提交一个任务
        Future<Set<Class<?>>> future = pool.submit(task);

        try {
            //将结果加入到classSet总集合中
            classSet.addAll(future.get());
        } catch (InterruptedException | ExecutionException e) {
            log.error(e.toString());
        }
    }


    private static class FindFileTask extends RecursiveTask<Set<Class<?>>> {


        private File file; //文件
        private String packageName; //包名

        public FindFileTask(File file, String packageName) {
            this.file = file;
            this.packageName = packageName;
        }


        @Override
        protected Set<Class<?>> compute() {
            Set<Class<?>> classSet = new HashSet<>();

            File[] files = file.listFiles(pathName -> pathName.isDirectory() || pathName.getName().endsWith(".class"));

            if (files == null) {
                return classSet;
            }

            String className;
            Class<?> clz;
            for (File file : files) {
                if (file.isDirectory()) {
                    //如果是目录,就创建子任去执行。
                    FindFileTask task = new FindFileTask(file, packageName + "." + file.getName());
                    task.fork(); //创建子任务
                    //将结果加入到classSet集合里(在这里就是将集合加入到总集合中)
                    classSet.addAll(task.join());
                } else {
                    //加载类
                    className = file.getName().substring(0, file.getName().length() - 6);
                    clz = loadClass(packageName + "." + className);
                    //将加载好的类加入到集合
                    classSet.add(clz);
                }
            }
            return classSet;
        }
    }

上述是在我的个人项目LmServer中的某个代码片段。主要功能就是查找指定包名下的.class文件,并加载到JVM中。注释写得很清楚了,不再赘述。

小结

对于并发执行的任务,Executor框架是一种强大且灵活的框架,它提供了大量可配置的选项,根据具体的场景和需求来定制这些配置,可以大大提高并发程序的性能,但配置不当可能会造成一些奇奇怪怪的问题,所以必须认真对待这些配置。

上一篇下一篇

猜你喜欢

热点阅读