Java并发编程 线程池
1. 线程池的自我介绍
线程池的重要性
什么是池
复用我们的线程
控制我们资源的总量
如果不使用线程池,每个任务都新开一个线程处理
- 一个线程
- for循环创建线程
- 当任务数量上升到1000
public class EveryTaskOneThread {
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new Task());
thread.start();
}
}
static class Task implements Runnable {
@Override
public void run() {
System.out.println("执行了任务");
}
}
}
这样开销太大,我们希望有固定数量的线程,来执行者1000个线程,这样就避开了反复创建并销毁线程所带来的开销问题。
为什么要使用线程池?
1.反复创建开销大
2.过多的线程会占用太多内存
解决以上两个问题的思路
- 用少量的线程-避免内存占用过多
- 让这部分线程都保持工作,且可以反复执行任务-避免生命周期的损耗
线程池的好处
- 加快响应速度
- 合理利用CPU和内存
- 统一管理
线程池适合应用的场景
- 服务器接收到大量请求时,使用线程池技术是非常合适的。
- 在开中,如果需要创建5个以上的线程,就可以使用线程池来管理。
2. 创建和停止线程池
线程池构造函数的参数
public ThreadPoolExecutor(
int corePoolSize, //第1个参数
int maximumPoolSize, //第2个参数
long keepAliveTime, //第3个参数
TimeUnit unit, //第4个参数
BlockingQueue<Runnable> workQueue, //第5个参数
ThreadFactory threadFactory, //第6个参数
RejectedExecutionHandler handler) { //第7个参数
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0) // 第一处
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)//第二处
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize
表示常驻核心线程数。如果等于0,则任务执行完成后,没有任何请求进入时销毁线程池的线程;如果大于0,即使本地任务执行完毕,核心线程也不会被销毁。这个值的设置非常关键,设置过大会浪费资源,设置的过小会导致线程频繁地创建或销毁。
maximumPoolSize
表示线程池能够容纳同时执行的最大线程数。从上方的示例代码中第一处来看,必须大于或等于1。如果待执行的线程数大于此值,需要借助第5个参数的帮助。缓存在队列中。如果maximumPoolSize 与corePoolSize 相等,即是固定大小线程池。
keepAliveTime
表示线程池中的线程空闲时间,当空闲时间达到KeepAliveTime 值时,线程被销毁,直到剩下corePoolSize 个线程为止,避免浪费内存和句柄资源。在默认情况下,当线程池的线程大于corePoolSize 时,keepAliveTime 才会起作用。但是ThreadPoolExecutor的allowCoreThreadTimeOut 变量设置为ture时,核心线程超时后也会被回收。
TimeUnit
表示时间单位。keepAliveTime 的时间单位通常是TimeUnit.SECONDS。
workQueue
表示缓存队列。当请求的线程数大于maximumPoolSize时,线程进入BlockingQueue 阻塞队列。
有3种最常见的队列类型
1)直接交换:SynchronousQueue(实际存不下任务)
2)无界队列:LinkedBlockingQueue
3)有界队列:ArrayBlockingQueue
threadFactory
表示线程工厂。它用来生产一组相同任务的线程。线程池的命名是通过给这个factory增加组名前缀来实现的。在虚拟机栈分析时,就可以知道线程任务是由哪个线程工厂产生的。
新线程是有ThreadFactory创建的, 默认使用 Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY,那么就可以改变线程名、线程组、优先级、是否是守护线程等。
handler
表示执行拒绝策略的对象。当超过第5个参数workQueue的任务缓存区上限的时候,就可以通过该策略处理请求,这是一种简单的限流保护。
增减线程的特点
- 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池。
- 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它。
- 通过设置maximumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
- 是只有在队列填满时才创建多于corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。
线程池里的线程数量设定为多少比较合适
- CPU密集型(加密、计算hash等):最佳线程数为CPU核心数1-2倍
- 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法:
- 线程数=CPU核心数*(1+平均等待时间/平均工作时间)
3. 常见线程池的特点和用法
- newFixedThreadPool
采用LinkedBlockingQueue 容易发生OOM
public class FixedThreadPoolOOM {
private static ExecutorService executorService = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executorService.execute(new SubThread());
}
}
}
class SubThread implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
VM 参数
-Xmx8m -Xms8m //将内存调小
image.png
-
newSingleThreadExecutor
采用LinkedBlockingQueue 容易发生OOM -
newCachedThreadPool
可缓存线程池
特点 无界线程池,具有自动回收多余线程的功能
maximumPoolSize 为 Integer.MAX_VALUE 容易发生OOM -
newScheduledThreadPool
maximumPoolSize 为 Integer.MAX_VALUE 容易发生OOM -
newWorkStealingPool
适合使用在很耗时的操作,但是newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现,由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中
4. 停止线程池的正确方法
1.shutdown
将线程池状态置为SHUTDOWN,并不会立即停止:
- 停止接收外部submit的任务
- 内部正在跑的任务和队列里等待的任务,会执行完
- 等到第二步完成后,才真正停止
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
executorService.shutdown();
executorService.execute(new ShutDownTask());
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}
image.png
- shutdownNow()
将线程池状态置为STOP。企图立即停止,事实上不一定:
- 跟shutdown()一样,先停止接收外部提交的任务
- 忽略队列里等待的任务
- 尝试将正在跑的任务interrupt中断
- 返回未执行的任务列表
它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。
但是大多数时候是能立即退出的
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
executorService.shutdownNow();
// List<Runnable> runnableList = executorService.shutdownNow();
boolean b = executorService.awaitTermination(9L, TimeUnit.SECONDS);
System.out.println(b);
System.out.println(executorService.isTerminated());
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
}
image.png
3.isShutdown
线程池状态是否为SHUTDOWN
4.isTerminated
线程池程已停止
5.awaitTermination
- 等所有已提交的任务(包括正在跑的和队列中等待的)执行完
- 或者等超时时间到
- 或者线程被中断,抛出InterruptedException
然后返回true(shutdown请求后所有任务执行完毕)或false(已超时)
5. 任务太多,怎么决绝?
拒绝时间
1.当Executor关闭时,提交新任务会被拒绝。
2.以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时。
线程池拒绝策略,通常有以下四种:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务 ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
- 钩子方法,给线程池加点料
- 每个任务执行前后
- 日志、统计
public class PauseableThreadPool extends ThreadPoolExecutor {
private final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();
private boolean isPaused;
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
public void resume() {
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程池被恢复了");
}
}
...
我被执行
我被执行
我被执行
我被执行
线程池被暂停了
线程池被恢复了
我被执行
我被执行
...
6. 实现原理、源码分析
线程池组成部分
- 线程池管理器
- 工作线程
- 任务队列
- 任务接口(Task)
线程池实现任务复用的原理
- 相同线程执行不同任务
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
线程池线程复用的主要代码如上;
具体分析如下:
如下图,假设是定长线程池定长为3,
1.那么线程池会根据核心线程的长度来创建线程,这样就固定了线程数量,
2.然后在runWorker的时候,while循环不停的去缓存队列中查找task;
3.有task,则运行runlable的run方法;
4.如果没有task,则执行 processWorkerExit(w, completedAbruptly),删除当前worker(移除workers中该worker的对象(HashSet<Worker> workers = new HashSet<Worker>();))以及更新workcount等操作
6. 线程池状态
1、RUNNING
(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
1
2、 SHUTDOWN
(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
3、STOP
(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING
(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
5、 TERMINATED
(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
8.使用线程池的注意点
- 避免任务堆积
- 避免线程数过度增加
- 排查线程泄露