线程池 - ThreadPoolExecutor

2018-06-26  本文已影响0人  我要做大牛23333

在Java中一般我们会通过Executors来创建线程池,一般有四种:

  1. newCachedThreadPool
  2. newFixedThreadPool
  3. newSingleThreadExecutor
  4. newScheduledThreadPool

这四种创建方式其实万变不离其宗,最底层都是使用的ThreadPoolExecutor,我们先来看下ThreadPoolExecutor的构造函数。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

总共有5个参数:

接下来我们挨个看下Executors里这几种构造函数到底是如何构造线程池的。

newCachedThreadPool
缓存线程池,可以理解为弹性线程池,个人认为比较适用于有特殊时段的高并发请求的场景。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());

可以看到在线程池里默认是没有线程的,但却可以申请无限多的线程(只要系统资源够用),如果现成空闲60秒后会被自动回收,最后那个SynchronousQueue是一个阻塞队列,长度永远为0,peek等操作永远返回null,如果调用take或者put的话会被阻塞,直到有相应的生产者或者消费者,这个特性很关键,因为在判断是否需要创建新线程的时候回用到BlockQueue里的offer方法,如下:

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
else if (!addWorker(command, false))
    reject(command);

首先判断目前的线程数是否小于corePoolSize,如果小于就直接新建一个,否则判断当前线程池是否仍在运行,这里说下ctl.get()这个方法,ctl是一个AtomicInteger类型,ctl.get()返回一个int类型,int类型一共4个字节,最高3位存储线程池的状态,后29位代表线程个数,一共有5种运行状态:

// runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

如果仍在运行状态,则对队列执行offer操作,看能否插入成功,

boolean offer(E e);

成功返回true,失败返回false。好,关键的来了SynchronousQueue的特性是长度永远为0,并且它的offer方法默认是立刻返回,这是什么意思呢?就是说在执行offer的时候如果没有消费者等待则立刻返回false,表示插入失败
此时会走到最后的if-else执行addWorker,addWorker会判断是否达到maximumPoolSize,没有的话就新建,如果无法再申请资源或达到上限则拒绝操作执行reject(command)
这是newCachedThreadPool为何可以不停地申请资源创建线程的原因。

newFixedThreadPool
顾名思义,就是固定大小的线程池,老规矩,先看构造函数:

new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());

可以看到corePoolSizemaximumPoolSize是一样的,不可伸缩,超时时间可以忽略了,因为不会再新建线程,最后的BlockingQueue用的是LinkedBlockingQueue,同学们注意了,这个队列和前面提到的SynchronousQueue有啥区别呢?这很关键,因为当决定是否新建线程还是放到等待队列里时就是看这个queue还能否插入,我们来看下LinkedBlockingQueue的实现:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
 }

可以看到默认队列长度是最大值,由于offer方法太复杂所以就不放出来了,但效果是判断是否达到队列最大长度,如果没有那么就把任务先加入队列。
所以这就是区别,在没有可用线程的时候线程池会不断地把任务放到LinkedBlockingQueue中等待可用线程

newSingleThreadExecutor
单线程线程池,如果所执行的任务抛出异常或者终止,该线程池会自动新建一个线程保证服务的可用。好处是可以保证任务是串行的,不用考虑并发同步等复杂的问题。

new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));

配置和上一个类似,只是线程数限制为1。FinalizableDelegatedExecutorService对ExecutorService做了一层封装,这里不详述了。

newScheduledThreadPool
用于执行定时任务

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());

可以看到这里使用了DelayedWorkQueue,在提交任务的时候

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

这里使用了DelayedWorkQueue的特性,在取操作的时候会block直到delay到期。由于本次没有详细研究DelayedWorkQueue所以不多做介绍,有兴趣的同学可以自己读下源码。
在《Effective Java II》和《Java并发编程实践》里都强调了在执行定时任务的时候最好使用线程池,而不要用Timer,因为线程池可以很好的维护线程状态,确保不会异常退出造成混乱。

总结

在使用线程池的时候最好使用Executors封装好的方法,如果想要定制化可以参考ThreadPoolExecutor的构造函数,自定义参数,切记选择合适的队列类型。相信在了解了底层实现原理后对线程池的控制会更有自信。

1_m7BOUXlstifMXvHY5BXHjw.jpeg
上一篇 下一篇

猜你喜欢

热点阅读