ThreadPool线程池浅析(代码讲解)
Case1(Executor):
public class T01_MyExecutor implements Executor {
public static void main(String[] args) {
new T01_MyExecutor().execute(()->{
System.out.println("hello executor");
});
}
@Override
//Executor就是定义一个接口,重写execute方法来执行一项任务
public void execute(Runnable command){
new Thread(command).run();
//command.run();
}
}
/*====================output========================
hello executor
*/
Executor是一个接口,重写execute方法来执行一项任务。
Case2(ExecutorService):
ExecutorService 继承自Executor,除了继承自父类的execute方法之外,还重写了submit方法。ExecutorService 相当于后台的服务,等着你往里面扔任务,除了execute方法之外还有submit。
submit里面除了可以往里面扔Runnable之外还可以往里面扔callable,当你需要一个线程运行完有返回值的时候,用callable,不需要则用没有返回值的Runnable。
Case3(Callable):
Callable是对Runnable进行了扩展,Runnable里面有一个run()方法,Callable里面有一个call()方法,其和run()类似。是被其他线程进行执行的方法。主要的区别是:一个有返回值一个没有。当你需要一个线程运行完有返回值的时候,用callable,不需要则用Runnable。
Case4(Executors):
Executors是一个实现类,而不是接口。区别于Executor,Executor是一个接口,Executors是一个工具类,操作Executor的一个工具类。
Case5(线程池的概念 && newFixedThreadPool):
线程池的概念:维护一堆线程,装在某个容器里等着任务来运行,线程池里面的线程执行完任务之后并不会消失(空闲),新的任务来啦,不需要启动新的线程。如果线程池里面的线程都忙不过来,则把“任务”扔到一个队列里面,等着线程池里面的线程来拿。
线程池维护的队列:
- 未执行的任务队列
- 完成的任务队列
线程池创建的常用三种方式:
newFixedThreadPool
:使用LinkedBlockingQueue实现,定长线程池。
newSingleThreadExecutor
:使用LinkedBlockingQueue实现,一池只有一个线程。
newCachedThreadPool
:使用SynchronousQueue实现,变长线程池。
- 附加几种:
- newScheduledThreadPool:执行定时的任务,里面的线程是可以复用
- newWorkStealingPool:工作窃取线程池
- ForkJoinPool:任务可以自动进行切分
查看线程池维护的队列Demo:
package ThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class T05_ThreadPool {
public static void main(String[] args) throws InterruptedException {
//ExecutorService是接口;Executors是工具类;(子类指向父接口)。newFixedThreadPool是用来创建固定线程个数的线程池,里面线程个数为5
ExecutorService service= Executors.newFixedThreadPool(5); //向ExecutorService里面扔任务的方法:execute submit
for (int i = 0; i < 6; i++) {
//线程池的五个线程“抢着”执行六个方法
service.execute(()->{
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(service);
//线程干完活就关掉线程池服务。shutdownNow()不管你干没干完,直接关掉
service.shutdown();
System.out.println(service.isTerminated()); //isTerminated:是否都执行完啦
System.out.println(service.isShutdown()); //是否关闭,关闭不代表执行完啦,有可能正在执行之中等着关闭
System.out.println(service);
//主线程睡5秒钟
TimeUnit.SECONDS.sleep(5);
System.out.println(service.isTerminated());
System.out.println(service.isShutdown());
System.out.println(service);
}
}
/*
* 运行结果:
* java.util.concurrent.ThreadPoolExecutor@7cca494b[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
false
true
java.util.concurrent.ThreadPoolExecutor@7cca494b[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-5
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-5
true
true
java.util.concurrent.ThreadPoolExecutor@7cca494b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
* */
由上面的程序可以看出,线程池维护了两个队列:queued tasks和completed tasks。newFixedThreadPool是用来创建固定线程个数的线程池,上面代码中里面线程个数为5。
Case6(Future && FutureTask):
FutureTask是Future 的一个实现类,其是可取消的异步计算。具有启动和取消计算的方法,查询计算是否完整,并检索计算结果。 结果只能在计算完成后才能检索; 如果计算尚未完成,则get()
方法将阻止。 一旦计算完成,则无法重新启动或取消计算(会使用之前的计算结果。除非使用runAndReset()
调用计算)
FutureTask,未来的任务,有一个返回值,比如FutureTask<Integer>,任务的返回值是Integer类型,FutureTask里面可以装一个Callalbe类型(有返回值,Integer call(); ),把Callable包装成一个FutureTask,FutureTask继承自RunnableFuture接口。所以FutureTask相当于一个适配器,使得可以使用Callable来创建线程(Thread类没有提供传入Callable的构造函数,Thread的构造方法里面不能接受Callable接口)。
package ThreadPool;
import java.util.concurrent.*;
public class T06_Future {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> task=new FutureTask<>(()->{ //把Callable包装成一个FutureTask
TimeUnit.MILLISECONDS.sleep(500);
return 1000;
});
new Thread(task).start();
System.out.println(task.get());//阻塞
ExecutorService service= Executors.newFixedThreadPool(5);
//Future拿到未来的一个返回值
Future<Integer> f=service.submit(()->{
TimeUnit.MILLISECONDS.sleep(500);
return 1;
});
System.out.println(service);
System.out.println(f.isDone());
System.out.println(f.get()+task.get()); //get是阻塞式的,建议把futureTask.get()放最后,因为其要求获得Callable线程计算的结果,如果计算没有完成就会强求,会导致阻塞,直到计算完成
System.out.println(f.isDone());
}
}
/*====================output========================
1000
java.util.concurrent.ThreadPoolExecutor@214c265e[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
false
1001
true
*/
get是阻塞式的,建议把futureTask.get()放最后,因为其要求获得Callable线程计算的结果,如果计算没有完成就会强求,会导致阻塞,直到计算完成。
Case7(newCachedThreadPool):
package ThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class T08_CachedPool {
public static void main(String[] args) throws InterruptedException {
ExecutorService service= Executors.newCachedThreadPool();
System.out.println(service);
for (int i = 0; i < 2; i++) {
service.execute(()->{
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(service);
TimeUnit.SECONDS.sleep(80);
System.out.println(service);
}
}
/*
* 等待了80s之后,会发现线程池里面的线程没啦,tasks变为了2.
*java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
pool-1-thread-2
pool-1-thread-1
java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
*
* */
newCachedThreadPool,刚开始一个线程都没有,来一个任务就起一个线程,如果来一个任务时线程池里面有空闲的线程,则立即执行,如果来一个任务线程池里面没有空闲的线程,则再起一个线程,线程的最大数量直到你的内存崩了之后(闲置的线程超过60s[alive time]后就自动销毁了)。
Case8(newSingleThreadExecutor):
package ThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class T09_SingleThreadPool {
public static void main(String[] args) {
ExecutorService service= Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int j=i;
service.execute(()->{
System.out.println(j+" "+Thread.currentThread().getName());
});
}
}
}
线程池里面就有一个线程,即使任务多也就是能一个。他的作用是保证任务前后是顺序执行的。既然只有一个线程,为什么不直接使用new Thread呢?是因为他没有每次都创建线程的开销。
Case9(newScheduledThreadPool):
package ThreadPool;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class T10_SchedulePool {
public static void main(String[] args) {
ScheduledExecutorService service= Executors.newScheduledThreadPool(4);
//以固定的频率来执行某个任务:参数列表:Runnable command,
// long initialDelay,
// long period,
// TimeUnit unit
service.scheduleAtFixedRate(()->{
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
},0,500,TimeUnit.MILLISECONDS); //第一个任务马上起来执行,每隔500ms任务重复执行
}
}
/*====================output========================
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
......
*/
用来执行定时的任务,里面的线程是可以复用的。上面的代码中,打印任务马上起来执行,并且会每隔500ms任务重复执行。并且每次执行任务的线程也不唯一,线程也是可以复用的。
Case10(newWorkStealingPool):
package ThreadPool;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static javafx.scene.input.KeyCode.R;
public class T11_WorkStealingPool {
public static void main(String[] args) throws IOException {
ExecutorService service= Executors.newWorkStealingPool();
//看自己的机器的核数
System.out.println(Runtime.getRuntime().availableProcessors());
service.execute(new R(1000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
//由于产生的精灵线程(守护线程、后台线程,daemon),主线程不阻塞的话,看不到输出
System.in.read();
}
static class R implements Runnable{
int time;
R(int t){
this.time=t;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(time+" "+Thread.currentThread().getName());
}
}
}
/*====================output========================
8
1000 ForkJoinPool-1-worker-1
2000 ForkJoinPool-1-worker-4
2000 ForkJoinPool-1-worker-3
2000 ForkJoinPool-1-worker-2
2000 ForkJoinPool-1-worker-5
*/
工作窃取线程池,每个线程都维护一个自已的任务队列,若某个线程自己的任务队列空了之后,回去其他的线程的任务队列中去偷任务来执行(主动找活干,以前是得分配)。线程池中的每个线程都是守护线程(daemon)。
Case11(ForkJoinPool):
package ThreadPool;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
public class T12_ForkJoinPool {
static int[] nums=new int[1000000];
static final int MAX_NUM=50000; //每一个小任务不超过50000个
static Random r=new Random();
//对1000000个数算总和
static {
for (int i = 0; i < nums.length; i++) {
nums[i]=r.nextInt(100);
}
//stream API,内部就是for循环,把这些加起来
System.out.println(Arrays.stream(nums).sum());
}
//RecursiveAction是没有返回值的,无法做总和的计算
/*static class AddTask extends RecursiveAction{
int start,end;
AddTask(int s,int e){
this.start=s;
this.end=e;
}
@Override
protected void compute() {
//如果分给某个线程的任务数小于MAX_NUM),就不再进行fork啦
if (end-start<=MAX_NUM){
long sum=0L;
for (int i = start; i < end; i++) {
sum+=nums[i];
}
System.out.println("from:"+start+" to:"+end+" = "+sum);
}else{
int middle=start+(end-start)/2; //如果大于MAX_VALUE,就切一半
AddTask subTask1=new AddTask(start,middle); //类似于递归,递归的过程是ForkJoinPool帮助维护的
AddTask subTask2=new AddTask(middle,end); //类似于递归
subTask1.fork(); //只要一fork,就会有新的线程启动
subTask2.fork();
}
}
}*/
static class AddTask extends RecursiveTask<Long>{
int start,end;
AddTask(int s,int e){
this.start=s;
this.end=e;
}
@Override
protected Long compute() {
//如果分给某个线程的任务数小于MAX_NUM),就不再进行fork啦
if (end-start<=MAX_NUM){
long sum=0L;
for (int i = start; i < end; i++) {
sum+=nums[i];
}
return sum;
}
int middle=start+(end-start)/2; //如果大于MAX_VALUE,就切一半
AddTask subTask1=new AddTask(start,middle); //类似于递归,递归的过程是ForkJoinPool帮助维护的
AddTask subTask2=new AddTask(middle,end);
subTask1.fork(); //只要一fork,就会有新的线程启动
subTask2.fork();
//RecursiveTask具有返回值
return subTask1.join()+subTask2.join();
}
}
public static void main(String[] args) throws IOException {
ForkJoinPool fjp=new ForkJoinPool();
//大任务
AddTask task=new AddTask(0,nums.length);
//直接做计算
fjp.execute(task);
//也可以直接使用task.join拿到结果,join是阻塞的,不用再System.in.read();
//long result=task.join();
//System.out.println(result);
System.in.read();
}
}
/*====================output========================
49548337
*/
fork:分叉,join:合并。ForkJoin 和 MapReduce(大数据)有点像。如果有一个非常大的大任务,则可以把这些切分成一个一个的小任务(如果小任务还是太大,则可以继续分,至于分成多小,可以自己指定),分完之后把结果进行合并,产生总的结果。
应用场景:大规模的数据计算
ForkJoinPool里面执行的任务必须是ForkJoinTask,这个任务可以自动进行切分。切分的指定通常继承RecursiveTask(有返回值) or RecursiveAction(无返回值)来进行切分。ForkJoin 的切分类似于递归,递归的过程是ForkJoinPool帮助维护的。
注意:
除了ForkJoinPool,其他线程池都是用的ThreadPoolExecutor这个类,如果想定义自己的线程池,可以使用ThreadPoolExecutor传入相关的参数(7个参数)。
详见:https://www.jianshu.com/p/8dcc81abde4d