工作生活

[深入学习]JAVA线程池[转载]

2019-07-08  本文已影响0人  lconcise

通过new Thread来创建一个线程,由于线程的创建和销毁都需要消耗一定的CPU资源,所以在高并发下这种创建线程的方式将严重影响代码执行效率。而线程池的作用就是让一个线程执行结束后不马上销毁,继续执行新的任务,这样就节省了不断创建线程和销毁线程的开销

  1. ThreadPoolExecutor
  2. 关闭线程池
  3. 4种拒绝策略
    3.1 CallerRunsPolicy
    3.2 AbortPolicy
    3.3 DiscardOldestPolicy
    3.4 DiscardPolicy
  4. 线程池的工厂方法
    4.1 newFixedThreadPool
    4.2 newCachedThreadPool
    4.3 newSingleThreadPool
    4.4 newScheduleThreadPool
  5. ThreadPoolExecutor 一些API的用法

ThreadPoolExecutor

创建Java线程池最核心的类 ThreadPoolExecutor:


image.png

它提供了四种构造函数,最核心的构造函数如下:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

这7个参数的含义如下:

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
  1. corePoolSize 线程池核心线程数。即线程池中保留的线程个数,即使这些线程是空闲的,也不会被销毁,除非通过ThreadPoolExecutor的allowCoreThreadTimeOut(true)方法开启了核心线程的超时策略;
  2. maximumPoolSize 线程池中允许的最大线程个数;
  3. keepAliveTime 用于设置那些超出核心线程数量的线程的最大等待时间,超过这个时间还没有新任务的话,超出的线程将被销毁;
  4. unit 超时时间单位;
  5. workQueue 线程队列。用于保存通过execute方法提交的,等待被执行的任务;
  6. threadFactory 线程创建工程,即指定怎样创建线程;
  7. handler 拒绝策略。即指定当线程提交的数量超出了maximumPoolSize后,该使用什么策略处理超出的线程。

下面看个例子,加入理解:

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1, 2, 10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                Thread::new,
                new ThreadPoolExecutor.AbortPolicy());
        System.out.println("===  线程池创建完毕");

        int activeCount = -1;
        int queueSize = -1;
        while (true) {
            if (activeCount != threadPoolExecutor.getActiveCount()
                    || queueSize != threadPoolExecutor.getQueue().size()) {
                System.out.println("===  活跃线程个数: " + threadPoolExecutor.getActiveCount());
                System.out.println("===  核心线程个数: " + threadPoolExecutor.getCorePoolSize());
                System.out.println("===  队列线程个数: " + threadPoolExecutor.getQueue().size());
                System.out.println("===  最大线程数  : " + threadPoolExecutor.getMaximumPoolSize());
                System.out.println("-------------------------------------");
                activeCount = threadPoolExecutor.getActiveCount();
                queueSize = threadPoolExecutor.getQueue().size();
            }
        }
    }

上面的代码创建了一个核心线程数量为1,允许最大线程数量为2,最大活跃时间为10秒,线程队列长度为1的线程池。

我们通过execute方法向线程池提交一个任务。

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1, 2, 10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                Thread::new,
                new ThreadPoolExecutor.AbortPolicy());
        System.out.println("===  线程池创建完毕");

        threadPoolExecutor.execute(() -> sleep(10));

        int activeCount = -1;
        int queueSize = -1;
        while (true) {
            if (activeCount != threadPoolExecutor.getActiveCount()
                    || queueSize != threadPoolExecutor.getQueue().size()) {
                System.out.println("===  活跃线程个数: " + threadPoolExecutor.getActiveCount());
                System.out.println("===  核心线程个数: " + threadPoolExecutor.getCorePoolSize());
                System.out.println("===  队列线程个数: " + threadPoolExecutor.getQueue().size());
                System.out.println("===  最大线程数  : " + threadPoolExecutor.getMaximumPoolSize());
                System.out.println("-------------------------------------");
                activeCount = threadPoolExecutor.getActiveCount();
                queueSize = threadPoolExecutor.getQueue().size();
            }
        }
    }

    private static void sleep(long value) {
        try {
            System.out.println("===  " + Thread.currentThread().getName() + "线程执行sleep方法");
            TimeUnit.SECONDS.sleep(value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

ThreadPoolExecutor的execute和submit方法都可以向线程池提交任务,区别是,submit方法能够返回执行结果,返回值类型为Future

启动线程,控制台如下输出:

===  线程池创建完毕
===  活跃线程个数: 1
===  Thread-0线程执行sleep方法
===  核心线程个数: 1
===  队列线程个数: 0
===  最大线程数  : 2
-------------------------------------

10秒后,控制台如下输出:

===  线程池创建完毕
===  活跃线程个数: 1
===  Thread-0线程执行sleep方法
===  核心线程个数: 1
===  队列线程个数: 0
===  最大线程数  : 2
-------------------------------------
===  活跃线程个数: 0
===  核心线程个数: 1
===  队列线程个数: 0
===  最大线程数  : 2
-------------------------------------

线程核心线程数量为1,通过execute提交一个任务之后,由于核心线程是空的,所以任务被执行了,这个任务的逻辑是休眠10秒,所以这10秒内,线程池的活跃线程数量为1。此外,并没有线程需要放到线程队列里等待,线程队列长度为0,
10秒后任务执行完成,活跃线程个数变为0。

我们通过execute方法向线程池提交2个任务,看看结果如何:

        threadPoolExecutor.execute(() -> sleep(10));
        threadPoolExecutor.execute(() -> sleep(10));

启动线程,控制结果如下:

===  线程池创建完毕
===  Thread-0线程执行sleep方法
===  活跃线程个数: 1
===  核心线程个数: 1
===  队列线程个数: 1
===  最大线程数  : 2
-------------------------------------
===  Thread-0线程执行sleep方法
===  活跃线程个数: 1
===  核心线程个数: 1
===  队列线程个数: 0
===  最大线程数  : 2
-------------------------------------
===  活跃线程个数: 0
===  核心线程个数: 1
===  队列线程个数: 0
===  最大线程数  : 2
-------------------------------------

活跃个数为1,队列线程数为1,
10秒后,
活跃个数为1,队列线程数为0,
10秒后,
活跃个数为0,队列线程数为0。

我们通过execute方法向线程池提交3个任务,结果:

        threadPoolExecutor.execute(() -> sleep(10));
        threadPoolExecutor.execute(() -> sleep(10));
        threadPoolExecutor.execute(() -> sleep(10));

启动线程,控制台结果如下:

===  线程池创建完毕
===  Thread-0线程执行sleep方法
===  Thread-1线程执行sleep方法
===  活跃线程个数: 2
===  核心线程个数: 1
===  队列线程个数: 1
===  最大线程数  : 2
-------------------------------------
===  Thread-1线程执行sleep方法
===  活跃线程个数: 1
===  核心线程个数: 1
===  队列线程个数: 0
===  最大线程数  : 2
-------------------------------------
===  活跃线程个数: 0
===  核心线程个数: 1
===  队列线程个数: 0
===  最大线程数  : 2
-------------------------------------

最大线程数为2,提交了3个任务,队列中也有等待任务,所以看到
活跃线程个数2,
队列线程个数1,
10秒后,
活跃线程个数1,
队列线程个数0,
10秒后,
活跃线程个数0,
队列线程个数1。

我们通过execute方法向线程池提交4个任务:

        threadPoolExecutor.execute(() -> sleep(10));
        threadPoolExecutor.execute(() -> sleep(10));
        threadPoolExecutor.execute(() -> sleep(10));
        threadPoolExecutor.execute(() -> sleep(10));

启动线程,控制台结果如下:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task cc.mrbird.demo.MainDemo$$Lambda$5/159413332@3d494fbf rejected from java.util.concurrent.ThreadPoolExecutor@1ddc4ec2[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at cc.mrbird.demo.MainDemo.main(MainDemo.java:24)
===  线程池创建完毕
===  Thread-0线程执行sleep方法
===  Thread-1线程执行sleep方法
===  Thread-0线程执行sleep方法

因为我们设置的拒绝策略为AbortPolicy,所以最后提交的那个任务直接被拒绝了。更多拒绝策略下面会介绍到。

关闭线程池

当线程池中所有任务都处理完毕后,线程并不会自己关闭。我们可以通过调用shutdown和shutdownNow方法来关闭线程池。两者的区别在于:

shutdown: Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.

启动有序关闭,在此过程中执行先前提交的任务,但不接受任何新任务。如果已经关闭,调用不会产生额外的效果。这种方便平滑的关闭线程池。

shutdownNow :Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution. These tasks are drained (removed) from the task queue upon return from this method.

尝试停止所有正在积极执行的任务,停止处理等待的任务,并返回等待执行的任务列表。从该方法返回时,将从任务队列中删除这些任务。这种方法比较暴力。

通过代码去实践下:

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                4,
                10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2),
                Thread::new,
                new ThreadPoolExecutor.AbortPolicy());

        threadPoolExecutor.execute(new ShortTask());
        threadPoolExecutor.execute(new LongTask());
        threadPoolExecutor.execute(new ShortTask());
        threadPoolExecutor.execute(new LongTask());

        threadPoolExecutor.shutdown();

        System.out.println("===  已经执行了线程的shutdown方法");
    }

    static class ShortTask implements Runnable {
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println("===  " + Thread.currentThread().getName() + " 执行shortTask完毕");
            } catch (InterruptedException e) {
                System.out.println("===  shortTask执行过程中被打断" + e.getMessage());
            }
        }
    }

    static class LongTask implements Runnable {
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println("===  " + Thread.currentThread().getName() + " 执行longTask完毕");
            } catch (InterruptedException e) {
                System.out.println("===  longTask执行过程中被打断" + e.getMessage());
            }
        }
    }

执行结果:

===  已经执行了线程的shutdown方法
===  Thread-0 执行shortTask完毕
===  Thread-0 执行shortTask完毕
===  Thread-1 执行longTask完毕
===  Thread-0 执行longTask完毕

可以看到,已经执行了shutdown()方法,并不会立即关闭线程池,而是等待所有被提交的任务都执行完了才关闭。

shutdownNow()例子:

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                4,
                10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2),
                Thread::new,
                new ThreadPoolExecutor.AbortPolicy());

        threadPoolExecutor.execute(new ShortTask());
        threadPoolExecutor.execute(new LongTask());
        threadPoolExecutor.execute(new ShortTask());
        threadPoolExecutor.execute(new LongTask());

        List<Runnable> runnables = threadPoolExecutor.shutdownNow();
        System.out.println("** " + runnables);

        System.out.println("===  已经执行了线程的shutdown方法");
    }

    static class ShortTask implements Runnable {
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println("===  " + Thread.currentThread().getName() + " 执行shortTask完毕");
            } catch (InterruptedException e) {
                System.out.println("===  shortTask执行过程中被打断" + e.getMessage());
            }
        }
    }

    static class LongTask implements Runnable {
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println("===  " + Thread.currentThread().getName() + " 执行longTask完毕");
            } catch (InterruptedException e) {
                System.out.println("===  longTask执行过程中被打断" + e.getMessage());
            }
        }
    }

执行结果:

===  longTask执行过程中被打断sleep interrupted
** [cc.mrbird.demo.MainDemo2$ShortTask@27bc2616, cc.mrbird.demo.MainDemo2$LongTask@3941a79c]
===  已经执行了线程的shutdown方法
===  shortTask执行过程中被打断sleep interrupted

可以看到,在执行shutdownNow方法后,线程池马上被关闭,正在被执行的两个任务被打断,并且返回了线程队列中等待被执行的两个任务。

4种拒绝策略

当线程池无法接收新的任务的时候,可采取如下策略:


image.png

AbortPolicy策略: 丢弃任务,并抛出异常

上面那个例子就是AbortPolicy拒绝策略。

CallerRunsPolicy策略: 由调用线程处理该任务

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1, 2, 10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                Thread::new,
                new ThreadPoolExecutor.CallerRunsPolicy());
        System.out.println("===  线程池创建完毕");

        threadPoolExecutor.execute(() -> sleep(5));
        threadPoolExecutor.execute(() -> sleep(5));
        threadPoolExecutor.execute(() -> sleep(5));
        threadPoolExecutor.execute(() -> sleep(5));

    }

    private static void sleep(long value) {
        try {
            System.out.println("===  " + Thread.currentThread().getName() + "线程执行sleep方法");
            TimeUnit.SECONDS.sleep(value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

执行结果:

===  线程池创建完毕
===  Thread-0线程执行sleep方法
===  main线程执行sleep方法
===  Thread-1线程执行sleep方法
===  Thread-1线程执行sleep方法

DiscardOldestPolicy策略:丢弃最早被放入到线程队列的任务,将新提交的任务放入到线程队列末端

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1, 2, 10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                Thread::new,
                new ThreadPoolExecutor.DiscardOldestPolicy());
        System.out.println("===  线程池创建完毕");

        threadPoolExecutor.execute(() -> sleep(5,"任务一"));
        threadPoolExecutor.execute(() -> sleep(5,"任务二"));
        threadPoolExecutor.execute(() -> sleep(5,"任务三"));
        threadPoolExecutor.execute(() -> sleep(5,"任务四"));
    }

    private static void sleep(long value,String name) {
        try {
            System.out.println("===  " + Thread.currentThread().getName() + "线程执行sleep方法 "+name);
            TimeUnit.SECONDS.sleep(value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

执行结果:

===  线程池创建完毕
===  Thread-0线程执行sleep方法 任务一
===  Thread-1线程执行sleep方法 任务三
===  Thread-0线程执行sleep方法 任务四

DiscardPolicy策略:直接丢弃新的任务,不抛异常

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1, 2, 10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                Thread::new,
                new ThreadPoolExecutor.DiscardPolicy());
        System.out.println("===  线程池创建完毕");

        threadPoolExecutor.execute(() -> sleep(5,"任务一"));
        threadPoolExecutor.execute(() -> sleep(5,"任务二"));
        threadPoolExecutor.execute(() -> sleep(5,"任务三"));
        threadPoolExecutor.execute(() -> sleep(5,"任务四"));
    }

    private static void sleep(long value,String name) {
        try {
            System.out.println("===  " + Thread.currentThread().getName() + "线程执行sleep方法 "+name);
            TimeUnit.SECONDS.sleep(value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

执行结果:

===  线程池创建完毕
===  Thread-0线程执行sleep方法 任务一
===  Thread-1线程执行sleep方法 任务三
===  Thread-0线程执行sleep方法 任务二

线程池的工厂方法

除了使用ThreadPoolExecutor的构造方法创建线程池外,我们也可以使用Executors提供的工厂方法来创建不同类型的线程池:

        ExecutorService executorService01 = Executors.newFixedThreadPool(5);
        ExecutorService executorService02 = Executors.newCachedThreadPool();
        ExecutorService executorService03 = Executors.newSingleThreadExecutor();
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);

newFixedThreadPool
查看newFixedThreadPool方法源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

可以看到,通过newFixedThreadPool创建的是一个固定大小的线程池,大小由nThreads参数指定,它具有如下几个特点:

  1. 因为corePoolSize和maximumPoolSize的值都为nThreads,所以线程池中线程数量永远等于nThreads,不可能新建除了核心线程数的线程来处理任务,即keepAliveTime实际上在这里是无效的。

  2. LinkedBlockingQueue是一个无界队列(最大长度为Integer.MAX_VALUE),所以这个线程池理论是可以无限的接收新的任务,这就是为什么上面没有指定拒绝策略的原因。

newCachedThreadPool
查看newCachedThreadPool方法源码:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

这是一个理论上无限大小的线程池:

  1. 核心线程数为0,SynchronousQueue队列是没有长度的队列,所以当有新的任务提交,如果有空闲的还未超时的(最大空闲时间60秒)线程则执行该任务,否则新增一个线程来处理该任务。

  2. 因为线程数量没有限制,理论上可以接收无限个新任务,所以这里也没有指定拒绝策略。

newSingleThreadExecutor
查看newSingleThreadExecutor源码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  1. 核心线程数和最大线程数都为1,每次只能有一个线程处理任务。
  2. LinkedBlockingQueue队列可以接收无限个新任务。
    newScheduledThreadPool
    查看newScheduledThreadPool源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
......

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

所以newScheduledThreadPool理论是也是可以接收无限个任务,DelayedWorkQueue也是一个无界队列。

使用newScheduledThreadPool创建的线程池除了可以处理普通的Runnable任务外,它还具有调度的功能:

  1. 延时指定时间后执行:
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        // 延时5秒执行
        executorService.schedule(() -> System.out.println("==  Hello"), 5, TimeUnit.SECONDS);

2.按指定速率执行:

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        // 按指定速率执行
        executorService.scheduleAtFixedRate(()->System.out.println(LocalDateTime.now()),3,5,TimeUnit.SECONDS);

执行结果:

2019-07-08T11:38:20.792
2019-07-08T11:38:25.776
2019-07-08T11:38:30.775
2019-07-08T11:38:35.773
2019-07-08T11:38:40.773
2019-07-08T11:38:45.774
  1. 按指定时延执行
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        // 按指定时延执行
        executorService.scheduleWithFixedDelay(()->System.out.println(LocalDateTime.now()),3,5,TimeUnit.SECONDS);

执行结果:

2019-07-08T11:41:35.225
2019-07-08T11:41:40.228
2019-07-08T11:41:45.228
2019-07-08T11:41:50.230
2019-07-08T11:41:55.231
2019-07-08T11:42:00.232

乍一看,scheduleAtFixedRate和scheduleWithFixedDelay没啥区别,实际它们还是有区别的:

  1. scheduleAtFixedRate按照固定速率执行任务,比如每5秒执行一个任务,即使上一个任务没有结束,5秒后也会开始处理新的任务;
  2. scheduleWithFixedDelay按照固定的时延处理任务,比如每延迟5秒执行一个任务,无论上一个任务处理了1秒,1分钟还是1小时,下一个任务总是在上一个任务执行完毕后5秒钟后开始执行。

对于这些线程池工厂方法的使用,阿里巴巴编程规程指出:


image.png

因为这几个线程池理论是都可以接收无限个任务,所以这就有内存溢出的风险。实际上只要我们掌握了ThreadPoolExecutor构造函数7个参数的含义,我们就可以根据不同的业务来创建出符合需求的线程池。一般线程池的创建可以参考如下规则:

  1. IO密集型任务,线程池线程数量可以设置为2 X CPU核心数;
  2. 计算密集型任务,线程池线程数量可以设置为CPU核心数 + 1。

一些API的用法

ThreadPoolExecutor提供了几个判断线程池状态的方法:

    public static void main(String[] args) throws InterruptedException{
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 3,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                Thread::new, new ThreadPoolExecutor.AbortPolicy());

        threadPoolExecutor.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.shutdown();
        System.out.println("===  线程为shutdown状态: " + threadPoolExecutor.isShutdown());
        System.out.println("===  线程池正在关闭: " + threadPoolExecutor.isTerminating());
        System.out.println("===  线程池已经关闭: " + threadPoolExecutor.isTerminated());
        // 阻塞当前线程,等待线程池关闭,timeout用于指定等待时间。
        threadPoolExecutor.awaitTermination(56,TimeUnit.SECONDS);
        System.out.println("===  线程池已经关闭: " + threadPoolExecutor.isTerminated());
    }

程序输出如下:

===  线程为shutdown状态: true
===  线程池正在关闭: true
===  线程池已经关闭: false
===  线程池已经关闭: true

前面我们提到,线程池核心线程即使是空闲状态也不会被销毁,除非使用allowCoreThreadTimeOut设置了允许核心线程超时:

    public static void main(String[] args) throws InterruptedException{
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1, 2, 3,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                Thread::new, new ThreadPoolExecutor.AbortPolicy());

        threadPoolExecutor.allowCoreThreadTimeOut(true);

        threadPoolExecutor.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println("===  任务执行完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

5秒后任务执行完毕,核心线程处于空闲的状态。因为通过allowCoreThreadTimeOut方法设置了允许核心线程超时,所以3秒后(keepAliveTime设置为3秒),核心线程被销毁。核心线程被销毁后,线程池也就没有作用了,于是就自动关闭了。

值得注意的是,如果一个线程池调用了allowCoreThreadTimeOut(true)方法,那么它的keepAliveTime不能为0。

ThreadPoolExecutor提供了一remove方法,查看其源码:

    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }

可看到,它删除的是线程队列中的任务,而非正在被执行的任务。举个例子:

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1, 2, 3,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                Thread::new, new ThreadPoolExecutor.AbortPolicy());

        threadPoolExecutor.allowCoreThreadTimeOut(true);

        threadPoolExecutor.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println("===  任务执行完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Runnable runnable = () -> System.out.println("=== 看看我是否被删除");
        threadPoolExecutor.execute(runnable);
        threadPoolExecutor.remove(runnable);

        threadPoolExecutor.shutdown();
    }

程序输出:


image.png

可看到任务并没有被执行,已经被删除,因为唯一一个核心线程已经在执行任务了,所以后提交的这个任务被放到了线程队列里,然后通过remove方法删除。

默认情况下,只有当往线程池里提交了任务后,线程池才会启动核心线程处理任务。我们可以通过调用prestartCoreThread方法,让核心线程即使没有任务提交,也处于等待执行任务的活跃状态:

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2, 2, 3,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                Thread::new, new ThreadPoolExecutor.AbortPolicy());

        System.out.println("=== 活跃线程个数: " + threadPoolExecutor.getActiveCount());
        threadPoolExecutor.prestartCoreThread();
        System.out.println("=== 活跃线程个数: " + threadPoolExecutor.getActiveCount());
        threadPoolExecutor.prestartCoreThread();
        System.out.println("=== 活跃线程个数: " + threadPoolExecutor.getActiveCount());
        threadPoolExecutor.prestartCoreThread();
        System.out.println("=== 活跃线程个数: " + threadPoolExecutor.getActiveCount());
    }

程序输出:
···
=== 活跃线程个数: 0
=== 活跃线程个数: 1
=== 活跃线程个数: 2
=== 活跃线程个数: 2
···
该方法返回boolean类型值,如果所以核心线程都启动了,返回false,反之返回true。

还有一个和它类似的prestartAllCoreThreads方法,它的作用是一次性启动所有核心线程,让其处于活跃地等待执行任务的状态。

ThreadPoolExecutor的invokeAny方法用于随机执行任务集合中的某个任务,并返回执行结果,该方法是同步方法:

    public static void main(String[] args) throws Exception{
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2, 5, 3,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                Thread::new, new ThreadPoolExecutor.AbortPolicy());

        // 任务集合
        List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
            TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
            return i;
        }).collect(Collectors.toList());
        // 随机执行结果
        Integer result = threadPoolExecutor.invokeAny(tasks);
        System.out.println("------------------");
        System.out.println(result);
        threadPoolExecutor.shutdown();
    }

程序输出:

------------------
0

ThreadPoolExecutor的invokeAll则是执行任务集合中的所有任务,返回Future集合:

    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2, 5, 3,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                Thread::new, new ThreadPoolExecutor.AbortPolicy());

        // 任务集合
        List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
            TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
            return i;
        }).collect(Collectors.toList());

        List<Future<Integer>> futures = threadPoolExecutor.invokeAll(tasks);
        futures.stream().map(f -> {
            try {
                return f.get();
            } catch (InterruptedException | ExecutionException e) {
                return null;
            }
        }).forEach(System.out::println);

        threadPoolExecutor.shutdownNow();
    }

程序输出如下:

0
1
2
3

总结如下方法:

方法 描述
allowCoreThreadTimeOut(boolean value) 是否允许核心线程空闲后超时,是的话超时后核心线程将销毁,线程池自动关闭
awaitTermination(long timeout, TimeUnit unit) 阻塞当前线程,等待线程池关闭,timeout用于指定等待时间。
execute(Runnable command) 向线程池提交任务,没有返回值
submit(Runnable task) 向线程池提交任务,返回Future
isShutdown() 判断线程池是否为shutdown状态
isTerminating() 判断线程池是否正在关闭
isTerminated() 判断线程池是否已经关闭
remove(Runnable task) 移除线程队列中的指定任务
prestartCoreThread() 提前让一个核心线程处于活跃状态,等待执行任务
prestartAllCoreThreads() 提前让所有核心线程处于活跃状态,等待执行任务
getActiveCount() 获取线程池活跃线程数
getCorePoolSize() 获取线程池核心线程数
threadPoolExecutor.getQueue() 获取线程池线程队列
getMaximumPoolSize() 获取线程池最大线程数
shutdown() 让线程池处于shutdown状态,不再接收任务,等待所有正在运行中的任务结束后,关闭线程池。
shutdownNow() 让线程池处于stop状态,不再接受任务,尝试打断正在运行中的任务,并关闭线程池,返回线程队列中的任务。

参考文章:
https://mrbird.cc/Java-Thread-Pool.html

上一篇下一篇

猜你喜欢

热点阅读