线程池2
2018-09-19 本文已影响0人
永远的太阳0123
-
线程池
所有的线程池中都有一个任务队列,使用的是BlockingQueue<Runnable>作为任务的载体。
active threads - 正在执行的任务
queued tasks - 任务队列
completed tasks - 结束的任务 -
FixedThreadPool
容量固定的线程池。创建时需要传入线程池的核心容量(也是最大容量)。
使用场景:大多数情况下使用的线程池都是FixedThreadPool。OS系统和硬件有线程支持的上限,不能随意地、无限制地提供线程池。线程池默认的容量上限是Integer.MAX_VALUE。
常见的线程池容量:PC - 200。服务器:1000~10000。
示例:
public class Test3_FixedThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 6; i++) {
executorService.execute(new Runnable() {
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - test executor");
}
});
}
// 打印线程池信息
System.out.println(executorService);
// 优雅关闭。不是强行关闭,而是不再处理新的任务,将已接收的任务处理完毕后关闭
executorService.shutdown();
// 判读是否已经结束, 相当于回收了资源。
System.out.println(executorService.isTerminated());
// 判断是否已经关闭, 即是否调用过shutdown方法
System.out.println(executorService.isShutdown());
// 打印线程池信息
System.out.println(executorService);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(executorService.isTerminated());
System.out.println(executorService.isShutdown());
System.out.println(executorService);
}
}
结果:
java.util.concurrent.ThreadPoolExecutor@42a57993[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
false
true
java.util.concurrent.ThreadPoolExecutor@42a57993[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-3 - test executor
pool-1-thread-1 - test executor
pool-1-thread-5 - test executor
pool-1-thread-2 - test executor
pool-1-thread-4 - test executor
pool-1-thread-3 - test executor
true
true
java.util.concurrent.ThreadPoolExecutor@42a57993[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
- CachedThreadPool
没有容量限制的线程池。核心容量为0,最大容量为Integer.MAX_VALUE,生命周期为60秒。
容量管理策略:如果线程池中的线程空闲达到60秒,线程自动释放。新任务优先使用空闲的线程,如果没有空闲的线程,则新建线程。例如,当执行第二个任务时第一个任务已经执行完毕,会复用执行第一个任务的线程,而不是新建线程。
应用场景: 内部应用或测试应用。 内部应用,有条件的内部数据瞬间处理时应用,如:电信平台夜间执行数据整理(有把握在短时间内处理完所有工作,且对硬件和软件有足够的信心)。 测试应用,在测试的时候,尝试得到硬件或软件的最高负载量,用于提供FixedThreadPool容量的指导。
示例一:
public class Test {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
cachedThreadPool.shutdown();
}
}
结果:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1
示例二:
public class Test4_CachedThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
System.out.println(executorService);
for (int i = 0; i < 5; i++) {
executorService.execute(new Runnable() {
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - test executor");
}
});
}
System.out.println(executorService);
try {
TimeUnit.SECONDS.sleep(65);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(executorService);
}
}
结果:
java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
pool-1-thread-1 - test executor
pool-1-thread-2 - test executor
pool-1-thread-4 - test executor
pool-1-thread-3 - test executor
pool-1-thread-5 - test executor
java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 5]
- ScheduledThreadPool
计划任务线程池,即可以自动执行计划任务的线程池。
创建时需要传入线程池的核心容量,最大容量是Integer.MAX_VALUE,生命周期永久。
scheduleAtFixedRate(runnable, start_limit, limit, timeunit)方法:
(1)runnable - 要执行的任务。
(2)start_limit - 程序开始执行后,时隔多久第一次执行任务。
(3)limit - 多次执行任务的时间间隔。可以理解为隔了limit,下一个任务开始启动。
(4)timeunit - 间隔时间的时间单位。
使用场景: 执行计划任务时,ScheduledThreadPool和DelaydQueue有选择地使用。例如:电信行业中的数据整理,每分钟整理,每小时整理,每天整理等。
示例:
public class Test5_ScheduledThreadPool {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
System.out.println(scheduledExecutorService);
// 定时完成任务。 scheduleAtFixedRate(runnable, start_limit, limit, timeunit)方法
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}, 0, 300, TimeUnit.MILLISECONDS);
}
}
结果:
java.util.concurrent.ScheduledThreadPoolExecutor@70dea4e[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1
pool-1-thread-1
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
······
- SingleThreadExceutor
核心容量和最大容量都为1的线程池,生命周期永久。
使用场景:保证任务顺序时使用。例如: 游戏大厅中的公共频道聊天,秒杀。
示例:
public class Test6_SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
System.out.println(executorService);
for (int i = 0; i < 5; i++) {
executorService.execute(new Runnable() {
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - test executor");
}
});
}
}
}
结果:
java.util.concurrent.Executors$FinalizableDelegatedExecutorService@70dea4e
pool-1-thread-1 - test executor
pool-1-thread-1 - test executor
pool-1-thread-1 - test executor
pool-1-thread-1 - test executor
pool-1-thread-1 - test executor
- ForkJoinPool
ForkJoinPool - 分支合并线程池(mapduce类似的设计思想)。 可以使用递归完成复杂任务。要求可分支合并的任务必须是ForkJoinTask类型的子类型。ForkJoinTask类型提供了两个抽象子类型,RecursiveTask有返回结果的分支合并任务,RecursiveAction无返回结果的分支合并任务。
compute方法:任务的执行逻辑。
ForkJoinPool没有所谓的容量。默认都是1个线程。根据任务自动的分支新的子线程。当子线程任务结束后,自动合并。所说的自动分支和自动合并是根据fork和join两个方法实现的。
使用场景: 主要是做科学计算或天文计算的。数据分析的。
示例:
public class Test7_ForkJoinPool {
final static int[] numbers = new int[1000000];
final static int MAX_SIZE = 50000;
final static Random r = new Random();
static {
for (int i = 0; i < numbers.length; i++) {
numbers[i] = r.nextInt(1000);
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
// 普通方法
long result = 0L;
for (int i = 0; i < numbers.length; i++) {
result += numbers[i];
}
System.out.println(result);
// 使用ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
AddTask task = new AddTask(0, numbers.length);
Future<Long> future = pool.submit(task);
System.out.println(future.get());
}
}
class AddTask extends RecursiveTask<Long> {
int begin;
int end;
public AddTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
protected Long compute() {
if ((end - begin) < Test7_ForkJoinPool.MAX_SIZE) {
long sum = 0L;
for (int i = begin; i < end; i++) {
sum += Test7_ForkJoinPool.numbers[i];
}
// System.out.println("form " + begin + " to " + end + " sum is : " + sum);
return sum;
} else {
int middle = begin + (end - begin) / 2;
AddTask task1 = new AddTask(begin, middle);
AddTask task2 = new AddTask(middle, end);
task1.fork();// 分支新的子线程。
task2.fork();
return task1.join() + task2.join();// 合并。join是一个阻塞方法,一定会获取到任务的结果
}
}
}
- ThreadPoolExecutor
除了ForkJoinPool,四种常用的线程池底层都是使用ThreadPoolExecutor实现的。
其中一个构造方法:public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)
(1)int corePoolSize - 核心容量。线程池中至少保持多少线程,核心线程不会被回收。
(2)int maximumPoolSize - 最大容量。线程池中最多含有多少线程。
(3)long keepAliveTime - 生命周期,即当线程空闲多久之后自动回收。0为永久。
(4)TimeUnit unit - 声明周期的单位。
(5)BlockingQueue<Runnable> workQueue - 任务队列,阻塞队列,它的泛型必须是Runnable。
使用场景: 默认提供的线程池不满足条件时使用。例如:初始线程数4,最大线程数200,线程空闲周期30秒。
示例:
public class Test8_ThreadPoolExecutor {
public static void main(String[] args) {
// 模拟fixedThreadPool。 核心线程5个,最大容量5个,线程的生命周期永久。
ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < 6; i++) {
service.execute(new Runnable() {
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - test executor");
}
});
}
System.out.println(service);
service.shutdown();
System.out.println(service.isTerminated());
System.out.println(service.isShutdown());
System.out.println(service);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(service.isTerminated());
System.out.println(service.isShutdown());
System.out.println(service);
}
}
结果:
java.util.concurrent.ThreadPoolExecutor@42a57993[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
false
true
java.util.concurrent.ThreadPoolExecutor@42a57993[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-1 - test executor
pool-1-thread-4 - test executor
pool-1-thread-5 - test executor
pool-1-thread-2 - test executor
pool-1-thread-3 - test executor
pool-1-thread-1 - test executor
true
true
java.util.concurrent.ThreadPoolExecutor@42a57993[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]