JUC-(3)线程池(上)

2020-07-06  本文已影响0人  一个菜鸟JAVA

为什么要使用线程池

如果使用过阿里巴巴编码规约开发插件的人,当你通过new Thread()方式创建一个线程时,会有一个警告提示,内容如下:

警告
上面已经说得很明白了,为什么不推荐我们直接通过new Thread()这种方式来创建线程,而推荐线程池.总的来说有下面几点:

重要的类

名称 简介
ExecutorService 真正的线程池接口
ThreadPoolExecutor ExecutorService的实现类
ScheduledExecutorService 可安排在给定的延迟后运行或定期执行的命令
ScheduledThreadPoolExecutor ScheduledExecutorService的实现类

如何创建ThreadPoolExecutor

ThreadPoolExecutor提供了很多构造方法,我们找一个参数最多的构造方法来分析.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

corePoolSize

核心线程池大小.池中所保存的线程数,包括空闲线程.就是说如果你设置该值为5,但是目前你只有2个线程在活动,另外3个没事干,它也不会将另外三个销毁.可以理解为最小保留的线程数.所以该值不宜设置过大,应该根据实际业务逻辑来设置.如果设置太大,当峰值过去之后,大量线程空闲,造成资源浪费.需要注意的是,线程池创建完之后并不会立马就创建corePoolSize个线程,而是需要执行任务时才创建,有点"懒加载"的味道.

maximumPoolSize

池中允许的最大线程数.上面我们说过,如果你将corePoolSize设置过大,容易造成峰值过后过多线程空闲浪费资源.设置该值,我们可以让线程池在一个允许的时间内创建多一些的线程来支持更多的任务.需要注意的是,如果workQueue选择的是无界队列,这个值是不会生效的,后面将会讲到为什么不会生效.

keepAliveTime和unit

当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间.unit就是这个时间的单位.

workQueue

执行前用于保持任务的队列.简单的说就是线程池现在没有多余的线程来执行新的任务,它就会把任务暂时存在这个队列,等待有空的线程过来领取任务.这个队列的选择会影响到maximumPoolSize是否生效.我们常用的选择有ArrayBlockingQueue或者LinkedBlockingQueue.如果我们选择的是无界队列,maximumPoolSize参数将失效,后面会讲到为什么失效.

threadFactory

执行程序创建新线程时使用的工厂.线程池中的线程就是通过这个工厂类创建的.ThreadFacotry是一个接口,我们只需要实现newThread()方法即可,这个方法就是用来创建一个线程.线程池里面的线程就是通过调用这个方法来创建的.在jdk中ThreadPoolExecutor内部默认实现了一个DefaultThreadFactory类,如果我们没有设置,将默认使用这个创建新的线程.在实际开发中,我们还是比较推荐使用自己创建的线程池,例如我们可以设置线程名称,当程序出现异常了,有利于我们排查异常知道是哪个线程出现异常了.有一些第三方包也给我们提供了一些好用的实现,例如谷歌的Guava中的ThreadFactoryBuilder,使用起来也是很方便的.

/**
 * 简单的线程工厂
 */
class SimpleThreadFactory implements ThreadFactory{
    private AtomicInteger counter = new AtomicInteger(0);
    private String threadName;

    public SimpleThreadFactory(String threadName) {
        this.threadName = threadName;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName(threadName+"-"+counter.incrementAndGet());
        return thread;
    }
}

上面就是一个ThreadFactory的简单实现,我们可以自定义设置线程的名称.

handler

由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序.当没有多余的线程来执行新任务,新任务将会被放到workQueue,单workQueue也满了无法存储新的任务,这个时候就会通过handler来处理新的任务.RejectedExecutionHandler是一个接口,它仅仅只需要实现rejectedExecution即可.而在ThreadPoolExecutor内部提供了四种实现:

    //这个会使用当前提交任务的线程去执行.
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //如果线程池没有关闭
            if (!e.isShutdown()) {
                //使用当前提交任务的线程去执行任务
                r.run();
            }
        }
    }
    //这个会直接拒绝报异常
    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    //什么也没做,任务被抛弃了
    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    //删除最老的任务
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //如果线程池没有关闭
            if (!e.isShutdown()) {
                //移除workQueue队列的头部任务.队列是FIFO,所以是删除最老的任务
                e.getQueue().poll();
                //使用线程池执行当前的任务
                e.execute(r);
            }
        }
    }

如果我们在创建线程池时没有设置处理策略,那么默认会使用的AbortPolicy策略,即直接抛出异常来处理.在实际的业务开发中,我们应该按自己实际的业务需求来设置该策略.如果默认的策略无法满足我们的要求,我们也可以实现自己的处理策略.

任务提交流程

之前我们说过workQueuee会影响maximumPoolSize参数是否生效,下图展示了当一个线程提交到线程池的过程:

任务被提交到线程的过程
  1. 任务提交到线程池首先会判断核心线程数是否超过设定值,如果没有超过则任务会被直接执行.
  2. 如果核心线程数满了,这时候是不会创建新的线程的,而是会先将线程放入workQueue.如果workQueue没满,它是不会再创建新的线程去处理当前的任务的.如果我们选用的队列是无界的或者没有设置最大长度,那么新的任务就会一直存入队列中,自然maximumPoolSize也不会生效.所以使用无界队列做缓冲是一个很危险的行为.如果任务的时间比较长,没有足够的线程去处理新任务,任务会一直堆压在队列中.时间越久越多,很容易就导致程序OOM.
  3. 如果workQueue已满,这个时候才会创建新的线程去执行任务.但是线程池中的最大线程个数是不能超过maximumPoolSize.如果超过该值,将会线程池创建时的处理策略来处理.

示例代码

public class App1 {
    public static void main(String[] args) {
        //设置workQueue长度为5
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(5);
        //设置自己的ThreadFactory
        ThreadFactory factory = new SimpleThreadFactory("自定义线程");
        //设置核心线程池为2,最大为4.当达到最大线程数时,使用当前线程去处理新增加的任务
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60,
                TimeUnit.SECONDS, workQueue, factory, new ThreadPoolExecutor.CallerRunsPolicy());

        //一共提交15个任务
        for (int i = 0; i < 15; i++) {
            pool.submit(() -> {
                //打印当前是哪个线程在执行任务
                System.out.println(Thread.currentThread().getName() + ":正在执行中...");
                try {
                    //模拟任务耗时
                    Thread.sleep(200L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        while (true) {

        }
    }
}

/**
 * 简单的线程工厂
 */
class SimpleThreadFactory implements ThreadFactory {
    private AtomicInteger counter = new AtomicInteger(0);
    private String threadName;

    public SimpleThreadFactory(String threadName) {
        this.threadName = threadName;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName(threadName + "-" + counter.incrementAndGet());
        return thread;
    }
}

上面的代码打印结果如下:

main:正在执行中...
自定义线程-1:正在执行中...
自定义线程-2:正在执行中...
自定义线程-3:正在执行中...
自定义线程-4:正在执行中...
main:正在执行中...
自定义线程-1:正在执行中...
自定义线程-2:正在执行中...
自定义线程-3:正在执行中...
自定义线程-4:正在执行中...
自定义线程-1:正在执行中...
自定义线程-2:正在执行中...
自定义线程-4:正在执行中...
自定义线程-3:正在执行中...
自定义线程-1:正在执行中...

上面打印的线程一共有5个.一个为main线程,另外四个是线程池创建的.我们一共提交了15个任务,因为队列的最大长度为5,最大线程为4.我们持续提交任务到,导致没有足够的线程去执行新任务并且也没有足够的队列空间存储新任务.而我们设置的拒绝策略为CallerRunsPolicy,所以再提交新任务时,会使用当前提交任务的线程去执行(即main线程).

Executors

在我们写业务代码时,通过上面那种方式创建线程池没有问题,但是比较麻烦.所以JDK为我们提供了更简便的线程池创建方式.在这个类中主要提供了一下简便的创建线程池的方法.

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

它的源码中SynchronousQueue是一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作,反之亦然.如果一个新的线程被提交到队列,立马就会有线程去处理.而处理的线程可能是之前创建的(之前创建的线程没有新任务需要处理)也可能是新创建的线程(没有线程是空闲的).

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

通过源码我们知道,该线程池中的线程个数是固定的.而且它使用了一个无界队列来存放来不及处理的线程.

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
上一篇下一篇

猜你喜欢

热点阅读