线程池
在一个应用程序中,我们需要多次使用线程,也就意味着,我们需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。而在Java中,内存资源是及其宝贵的,所以,我们就提出了线程池的概念。
线程池:Java中开辟出了一种管理线程的概念,这个概念叫做线程池,从概念以及应用场景中,我们可以看出,线程池的好处,就是可以方便的管理线程,也可以减少内存的消耗。
那么,我们应该如何创建一个线程池那?Java中已经提供了创建线程池的一个类:Executor
而我们创建时,一般使用它的子类:ThreadPoolExecutor
public ThreadPoolExecutor(int paramInt1, int paramInt2, long paramLong, TimeUnit paramTimeUnit,
BlockingQueue<Runnable> paramBlockingQueue, ThreadFactory paramThreadFactory,
RejectedExecutionHandler paramRejectedExecutionHandler) {
this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
this.mainLock = new ReentrantLock();
this.workers = new HashSet();
this.termination = this.mainLock.newCondition();
if ((paramInt1 < 0) || (paramInt2 <= 0) || (paramInt2 < paramInt1) || (paramLong < 0L))
throw new IllegalArgumentException();
if ((paramBlockingQueue == null) || (paramThreadFactory == null) || (paramRejectedExecutionHandler == null))
throw new NullPointerException();
this.corePoolSize = paramInt1;
this.maximumPoolSize = paramInt2;
this.workQueue = paramBlockingQueue;
this.keepAliveTime = paramTimeUnit.toNanos(paramLong);
this.threadFactory = paramThreadFactory;
this.handler = paramRejectedExecutionHandler;
}
下面一张图来更好的理解线程池和这几个参数:
image.png(1)如果当前运行的线程少于corePoolSize,则会创建新的线程来执行新的任务,即使线程池中的其他线程是空闲的;
(2)如果运行的线程个数等于或者大于corePoolSize且小于maximumPoolSize,则会将提交的任务存放到阻塞队列workQueue中;
(3)如果当前workQueue队列已满的话,则会创建新的线程来执行任务;
(4)如果线程个数已经超过了maximumPoolSize,则会使用饱和策略RejectedExecutionHandler来进行处理增量的任务
ThreadPoolExecutor
image.pngExecutor 框架结构(主要由三大部分组成)
- 任务(Runnable /Callable)
执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。
- 任务的执行(Executor)
Executor包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。
注意: 通过查看 ScheduledThreadPoolExecutor 源代码我们发现 ScheduledThreadPoolExecutor 实际上是继承了 ThreadPoolExecutor 并实现了 ScheduledExecutorService ,而 ScheduledExecutorService 又实现了 ExecutorService,正如我们下面给出的类关系图显示的一样。ThreadPoolExecutor 类描述:
//AbstractExecutorService实现了ExecutorService接口
public class ThreadPoolExecutor extends AbstractExecutorService
ScheduledThreadPoolExecutor 类描述:
//ScheduledExecutorService实现了ExecutorService接口
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
问题:ScheduledThreadPoolExecutor 是什么设计模式
-
异步计算的结果
Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)
image.png
- 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
- 把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable <T> task))。
- 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。
-
最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
image.png
线程池的5种状态:Running、ShutDown、Stop、Tidying、Terminated。
分析。以下是ThreadPoolExecutor状态控制的主要变量和方法:
//原子状态控制数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29比特位
private static final int COUNT_BITS = Integer.SIZE - 3;
//实际容量 2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// runState存储在高位中
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl 打包和解压ctl
// 解压runState
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 解压workerCount
private static int workerCountOf(int c) { return c & CAPACITY; }
// 打包ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池使用一个AtomicInteger的ctl变量将 workerCount(工作线程数量)和 runState(运行状态)两个字段压缩在一起 ,这种做法在在java源码里经常有出现,如在 ReentrantReadWriteLock 里就将一个int分成高16位和底16位,分别表示读锁状态和写锁状态。ThreadPoolExecutor里也是使用了同样的思想,表现得更加复杂。
ThreadPoolExecutor用3个比特位表示runState, 29个比特位表示workerCount。因此这里需要特别说明的是:确切的说,当最大线程数量配置为Integer.MXA_VAULE时,ThreadPoolExecutor的线程最大数量依然是2^29-1。
目前来看这是完全够用的,但随着计算机的不断发展,真的到了不够用的时候可以改变为AtomicLong。这如同32位系统时间戳会在2038年01月19日03时14分07秒耗尽一样,当以后我们的系统线程能够超过2^29-1时,这些代码就需要调整了。对于未来,无限可能。
思考一下为什么是29:3呢?
这是因为我们的运营状态有5种,向上取2次方数,2^3 = 8。所以必须要3个比特位来表示各种状态。
运行状态解释:
- RUNNING 运行态,可处理新任务并执行队列中的任务
- SHUTDOW 关闭态,不接受新任务,但处理队列中的任务
- STOP 停止态,不接受新任务,不处理队列中任务,且打断运行中任务
- TIDYING 整理态,所有任务已经结束,workerCount = 0 ,将执行terminated()方法
- TERMINATED 结束态,terminated() 方法已完成
整个CTL的状态,会在线程池的不同运行阶段进行CAS转换。
ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
下面这些对创建 非常重要,在后面使用线程池的过程中你一定会用到。
ThreadPoolExecutor 3 个最重要的参数:
- corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
- maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
- workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,信任就会被存放在队列中。
ThreadPoolExecutor其他常见参数:
- keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
- unit : keepAliveTime 参数的时间单位。
- threadFactory :executor 创建新线程的时候会用到。
- handler :饱和策略。关于饱和策略下面单独介绍一下。
ThreadPoolExecutor 饱和策略定义:
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,ThreadPoolTaskExecutor 定义一些策略:
- ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
- ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
- ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
-
ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求
image.png
ThreadPoolExecutor 提交任务
public void execute(Runnable command)
public <T> Future<T> submit(Callable<T> task)
public Future<?> submit(Runnable task)
//执行集合中所有任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
//提交所有任务,返回第一个执行完的task的执行结果,其他的task结束。
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)