JAVA线程池
在学习线程之前需要先明确两点:
-
每一个线程,在某一时间段内只会执行一个任务
-
线程池中的线程是可以重复使用的
五种类型线程池的创建方式
-
Executor.newSingleThreadExecutor():创建只有一个线程的线程池(SingleThreadPool)如果往该线程池中提交多个任务,那么这些任务将会被同一个线程顺序执行
-
Executors.newCachedThreadPool():创建可以存放多个线程的线程池(CachedThreadPoll),这些线程可以同时执行,默认情况下,如果某个线程的空闲时间超过60s,那么此线程会被终止运行并从池中删除
-
Executors.newFixedThreadPool(线程数):创建拥有固定线程数的线程池,既然池中的线程数是固定的,那么即使某个线程长时间没有任务执行,那么也会一直等待,不会从池中删除,一般情况下,线程数应该和本级CPU的核数保持一直或整数倍,获取本级的CPU的核数的代码是Runtime.getRuntime().availableProcessors()
-
Executors.newSingleThreadScheduledExecutor():创建一个可用于任务调度的线程池,并且池中只有一个线程,任务调度是指可以让任务在用户指定的时间执行
-
Executors.newScheduledThreadPool():创建一个可用于任务调度的线程池,并且池中有多个线程
以上创建的线程池可以大致分为两类,分别是ExecutorService线程池和ScheduledExecutorService线程池,其中ScheduledExecutorService继承ExecutorService
ExecutorService线程池
ExecutorService线程池 可以通过submit()方法想ExecutorService中提交Callable或者Runable类型的线程任务,任务提交后就会被线程池中的线程领取并执行。线程执行完毕后,会将结果返回到Future对象中,具体提交如下 如果提交的是Callable类型的线程任务,线程通过callable中的call方法执行任务,并用future对象接收call方法的返回值 如果提交的是Runable类型的线程任务,线程通过Runable中的run方法执行任务,而run方法没有返回值,因此Future接收的值是NULL shutDown和shutdownNow是用于关闭线程池的方法,区别如下 shutDown方法将线程池里立刻变成shutdown状态,此时不能再往线程池中增加任务,否则抛出RejectedExecutionException异常,但是shutdown执行后,线程池不会立刻退出,而是会等待线程池中已有的任务全部处理完毕后在推出(即线程池中的所有线程执行完毕后,才会退出) shutdownNow 将线程池的状态乐可变成stop状态,立刻停止所有正在执行的线程,并且不再处理等待队列的任务,这些任务会以List<Runable>的形式,保存在返回值中
ScheduledExecutorService线程池
可以通过schedule()方法向ScheduledExecutorService中提交Callable或者Runable类型的线程任务,特殊的是,用户可以设置次线程池中的任务的执行时间
线程池的使用步骤
-
创建线程池
-
使用submit或者schedule提交到线程池中的线程
-
线程通过run()或call()方法执行任务,并将任务返回Future对象
-
Future通过get()方法以闭锁的方式获取真实处理后的返回值
案例:创建固定数量的线程池
//创建固定数量的线程池
ExecutorService executorService = Executors.newFixedThreadPool(6);
List<Future<Integer>> futureList=new ArrayList<>();
for(int i=0;i<6;i++){
Future<Integer> res = executorService.submit(() -> {
int sum=0;
for (int j = 0; j <=10 ; j++) {
sum+=j;
}
return sum;
});
futureList.add(res);
}
for(Future<Integer> stringFuture:futureList){
try {
//判断线程是否完成
System.out.println(stringFuture.isDone());
//获取返回值
System.out.println(stringFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
executorService.shutdown();
}
}
System.out.println("全部执行完毕");
线程池的流程控制案例
-
向线程池提交两个任务,每个任务依次执行以下操作:记录当前线程的启动时间,随机休眠2秒内,打印当前线程名,返回当前线程名
-
这两个任务不会立刻执行,而是等待一段时间(3S)后再执行
-
在本程序中,main线程会尝试获取这两个线程的结果,如果这两个子电池还没执行完则提示未完成,否则提示已完成并打印线程的返回值
public class ThreadTask implements Callable<String> {
private String tname;
@Override
public String call() throws Exception {
//获取当前线程的名字
String name=Thread.currentThread().getName();
long currentTimeMillis=System.currentTimeMillis();
System.out.println("线程"+tname+"启动时间"+currentTimeMillis);
Thread.sleep((long)Math.random()*2000);
System.out.println(name+"---"+tname+"正在执行");
return name+"===="+tname;
}
public ThreadTask(String name){
this.tname=name;
}
}
class TestPool{
public static void main(String[] args) throws ExecutionException, InterruptedException {
Future<String> res=null;
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
List<Future<String>> result=new ArrayList<>();
for (int i = 0; i <2 ; i++) {
res=scheduledExecutorService.schedule(new ThreadTask("thread"+i), (long) 3, TimeUnit.SECONDS);
result.add(res);
}
for (Future<String> stringFuture:result){
System.out.println(stringFuture.isDone()?"已完成":"未完成");
System.out.println("线程执行完毕,返回的结果:"+stringFuture.get());
}
scheduledExecutorService.shutdown();
}
}
运行结果:
image-20210627093241410.png
运行结果分析:
-
如果TaskCallable线程还没有执行玩不,主线程就去执行res.isDone(),则会显示未完成
-
之后两个子线程先后启动执行
-
其中一个子线程执行完毕后返回结果
-
当第二个子线程执行完毕后,会显示已完成,并打印返回结果
自定义线程池的构建原理与案例详解
可以通过ThreadPoolExecutor的构造方法来创建一个自定义线程池对象,并且通过execute向池中提交无返回值的任务,或者使用submit()向池中提交有返回值的任务
image-20210627094608436.png其中,构造方法中各个参数的含义如下所述:
-
corePoolSize:线程池中核心线程数的最大值,核心线程是指一旦有任务提交,核心线程就会去执行
-
maximumPoolSize:线程池中最多能容纳的线程总数。线程总数=核心线程+非核心线程数。非核心线程是指如果有任务提交,首先会交给核心线程执行,如果核心线程满了再将任务放到workQueue中,如果workQueue也满了才将任务交给非核心线程去执行
-
keepAliveTime:线程中非核心线程的最大空闲时长。如果超过了该时间,空闲的非核心线程就会从线程池中被删除。如果设置了allowCoreThreadTimeOut=true,那么keepAliveTime也会作用于核心线程。
-
unit:keepAliveTime的时间单位
-
workQueue:等待提交到线程池中的任务队列。如果所有的核心线程都在执行,那么新添加的任务就会被添加到这个队列中等待处理;如果队列也满了,线程池就会创建非核心线程去执行这些无法添加到队列中的任务;如果向线程池中提交的任务数量大于maximumPoolSize+workQueue.size()就会出现异常 ,异常的类型取决于构造方法的最后一个参数handler
-
threadFactory:创建线程的方式,一般不用设置,使用默认值即可
-
handler:拒绝策略。当向线程提交的任务已满,如何拒绝超额的任务,拒绝策略的上级接口是RejectedExecutionHandler,该接口中定义了拒绝时执行的方法rejectedExecution,源码如下
image-20210627095935618.png
该接口的四个实现类,也就是有4种默认的拒绝策略,分别如下:
-
AbortPolicy:默认的拒绝策略,如果已经饱和,就丢掉超额的任务,并抛出RejectedExecutionException异常
-
DiscardPolicy:如果饱和,丢掉超额的任务,但不会抛出异常
-
DiscardOldestPolicy:队列是FIFO的结构,当采用此策略时,如果已经饱和就删除最早进入队列的任务,再将新任务追加到队尾
-
CallerRunsPolicy:如果饱和,新任务就不会去尝试添加到workQueue中,而是直接去调用execute(),是一种“急脾气”的策略,比如排队的时候遇到很多人排就直接插队到最前。
此外,还可以实现RejectedExecutionHandler自定义拒绝策略
public class ThreadMy implements Runnable{
private String threadName;
public ThreadMy(String threadName){
this.threadName=threadName;
}
@Override
public void run() {
try {
System.out.println(threadName);
System.out.println(threadName+"执行了1s");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class TestMyPool{
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(new ThreadMy("t1"));
threadPoolExecutor.execute(new ThreadMy("t2"));
threadPoolExecutor.execute(new ThreadMy("t3"));
threadPoolExecutor.execute(new ThreadMy("t4"));
threadPoolExecutor.execute(new ThreadMy("t5"));
threadPoolExecutor.execute(new ThreadMy("t6"));
threadPoolExecutor.shutdown();
}
}
-
当只提交t1任务的时候,线程池中正好有一个核心线程,所以t1可以立即执行
-
当提交t1,t2两个线程的时候,由于核心线程只有一个,所以T1线程会立即执行,t2会进入workQueue等待,等待1s后线程1执行完毕后开始执行
-
如果提交t1-t4的线程的时候,由于任务队列可以容纳3个线程,所以执行逻辑和第二步一样
-
当提交t1-t5的时候,由于队列三个已满,此时就会使用非核心线程执行,t5,所以执行顺序是t1,t5,t2,t3,t4
-
当提交6个线程的时候,由于大于最大线程数(2)+workQueue队列(3),所以会出触发拒策略
此外,也可以使用无界队列,最大线程数只要比核心线程数大就可以