JAVA线程池

2021-05-28  本文已影响0人  quanCN

简介

线程池,从字面含义来看,是指管理一组同构工作线程的资源池。线程池与工作队列(任务队列)密切相关的,如图


好处

总体设计

Java中的线程池核心实现类是ThreadPoolExecutor,ThreadPoolExecutor的UML类图如下


生命周期

为了解决Executor服务的生命周期问题,ExecutorServiceExecutor进行了扩展,添加了一些用于生命周期管理的方法,如下

public interface ExecutorService extends Executor {
    //关闭服务,会先完成以及提交的任务而不再接收新的任务
    void shutdown();
    //暴力关闭服务,尝试取消所有运行中的任务,并且不再启动队列中尚未开始的任务
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
        
    //提交指定任务去执行
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    
    //执行给定的任务,返回所有个任务的结果
    <T> List<Future<T>> invokeAll(Collection<?extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
    
    //执行给定的任务,返回其中一个任务的结果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService的生命周期有5种状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED下图根据jdk8的源码,所画的生命周期流程图:

设置线程池的大小

线程池的理想大小取决于被提交任务的类型以及部署系统的特性。例如对于计算密集型任务线程池大小设置为N_{cpu}+1通常能实现更优的利用率,对于I/O密集型操作线程池的规模应该更大。也可以通过设置不同大小的的线程池运行程序观察CPU利用率的水平,从而找到更优的线程池大小。
当然CPU周期并不是唯一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库连接等。

默认线程池

JAVA类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态工厂方法之一来创建一个线程池:

配置ThreadPoolExecutor

ThreadPoolExecutor是一个灵活的、稳定的线程池,允许进行各种定制。如果默认的执行策略不能满足需求,那么可以通过ThreadPoolExecutor的构造函数来实例化一个对象,并且根据自己 的需求来定制,构造函数如下(7大参数):

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                              TimeUnit unit, BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory, RejectedExecutionHandler handler){...}

源码解读

execute()方法是Executor中定义的方法,是线程池的核心方法,ThreadPoolExecutor的实现如下

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

可以看出大致分为三步

  1. 如果当前线程数小于corePoolSize,将创建一个新线程,并将command做为第一个任务。调用addWorker()方法原子性地检查运行状态和工作者线程数量,返回false来防止错误警报,该错误警报在不应该添加线程的情况下会增加线程。
  2. 如果一个任务成功入队,仍需要再次检查是否应该添加一个线程(因为可能距离上次检查有的线程已经死亡)或者线程池已经关闭。因此,重新检查状态后,工作者线程执行任务或者拒绝任务并且触发饱和策略
  3. 如果不能将任务入队,则尝试添加一个新线程。如果失败了,则代表已经停止或饱和了,所以拒绝这个任务,并且触发饱和策略
上一篇 下一篇

猜你喜欢

热点阅读