ThreadPoolExecutor
1 创建线程池
通过ThreadPoolExecutor创建线程池:
ExecutorService executorService = new ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory,
BlockingQueue<Runnable> workQueue)
corePoolSize:核心线程的数量。
maximumPoolSize:线程池能够容纳的最大线程数量。
- 线程池创建线程的策略,线程池中的线程数用n表示
(1) 当n<corePoolSize时,创建核心线程,并执行。
(2)当corePoolSize<n<maximumPoolSize时,有两种策略:
a:当队列queue未满时,将新线程插入队列中,等待核心线程完成后有空闲的核心线程
时,出队执行;
b:当队列queue已满时,会创建非核心线程执行,这里就与keepAliveTime这个参数有关了,
如果keepAliveTime>0那么在非核心线程空闲后,将可以接收到新的线程执行。
(3)当n>maximumPoolSize时,会采用拒绝策略,拒绝新线程的到来。
下面是自己写的一个小例子:
import java.util.concurrent.*;
public class ReentrantLockCondition {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5, 8,
1L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(2));
/**
* 第一次插入5个线程时,因为线程数小于corePoolSize(=5),因此会创建5个核心线程并立即执行
*/
System.out.println("插入5个核心线程");
for(int i=0;i<5;i++){
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("第" + index + "个核心线程:" + System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
/**
* 第二次插入2个线程时,这个时候的线程数会全部进入queue队列中,插入2个后,ArrayBlockingQueue队列就已满了
*/
System.out.println("插入2个线程进入队列");
for(int i=0;i<2;i++){
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("第" + index + "个线程走出队列:" + System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
/**
* 再次插入三个线程,此时核心线程因为sleep了10s,因此没有结束,队列中的线程也还存在并且是满的,
* 同时,又因为maximumPoolSize=8>corePoolSize=5,因此会创建非核心线程并执行
*/
System.out.println("插入3个非核心线程");
for(int i=0;i<3;i++){
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("第" + index + "个非核心线程:" + System.currentTimeMillis() +
":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
/**
* 休眠7s钟后,非核心线程在sleep了5s中后,再经过keepAliveTime=1s,非核心线程被销毁,
* 7s后,此时线程池中的线程数只为5
*/
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* 因为经过7s后,非核心线程已经被销毁,因此又可以再次创建新的非核心线程了
*/
System.out.println("再次插入3个非核心线程");
for(int i=0;i<3;i++){
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("第二次插入:第" + index + "个非核心线程:" + System.currentTimeMillis() +
":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* 此时,线程池中的线程数为8并且达到了maximumPoolSize的上限,队列也满了,再进入线程时,线程池将会采用拒绝策略
*/
System.out.println("第三次插入3个非核心线程");
for(int i=0;i<3;i++){
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("第三次次插入:第" + index + "个非核心线程:" + System.currentTimeMillis() +
":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
运行结果:
插入5个核心线程
第2个核心线程:1515050431718:Thread ID:13
第0个核心线程:1515050431718:Thread ID:11
第1个核心线程:1515050431718:Thread ID:12
第3个核心线程:1515050431718:Thread ID:14
插入2个线程进入队列
第4个核心线程:1515050431718:Thread ID:15
插入3个非核心线程
第0个非核心线程:1515050431759:Thread ID:16
第1个非核心线程:1515050431759:Thread ID:17
第2个非核心线程:1515050431759:Thread ID:18
第0个线程走出队列:1515050436758:Thread ID:18
第1个线程走出队列:1515050436758:Thread ID:17
再次插入3个非核心线程
第二次插入:第2个非核心线程:1515050439777:Thread ID:19
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.example.chapter2.ReentrantLockCondition$5@3a71f4dd rejected from java.util.concurrent.ThreadPoolExecutor@7adf9f5f[Running, pool size = 8, active threads = 8, queued tasks = 2, completed tasks = 3]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
第三次插入3个非核心线程
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.example.chapter2.ReentrantLockCondition.main(ReentrantLockCondition.java:115)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
第二次插入:第0个非核心线程:1515050441715:Thread ID:13
第二次插入:第1个非核心线程:1515050441715:Thread ID:14
keepAliveTime和TimeUnit unit:表示在线程空闲下来时,是否超时释放线程池中保存的线程资源,keepAliveTime为时间量,unit为时间单位(TimeUnit.SECONDS为秒,TimeUnit.MILLISECONDS为毫秒,TimeUnit.MICROSECONDS微秒,TimeUnit.NANOSECONDS纳秒,还有SECONDS,MINUTES,HOURS,DAYS),默认情况下设置的allowCoreThreadTimeOut=false,因此线程池中会保存corePoolSize数量的核心线程数,其他大于的线程会超时keepAliveTime销毁,如果allowCoreThreadTimeOut=true则会销毁线程池中的空闲线程直到为0。
对于allowCoreThreadTimeOut写了个测试例子如下:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(5, 8,
1L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(2));
//executorService.allowCoreThreadTimeOut(true);
System.out.println("插入5个核心线程");
for(int i=0;i<5;i++){
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("第" + index + "个核心线程:" + System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
当将代码中的executorService.allowCoreThreadTimeOut(true);注释掉时,allowCoreThreadTimeOut默认为false,此时结果为:
插入5个核心线程
第0个核心线程:1515052978417:Thread ID:11
第1个核心线程:1515052978417:Thread ID:12
第3个核心线程:1515052978417:Thread ID:14
第4个核心线程:1515052978417:Thread ID:15
第2个核心线程:1515052978418:Thread ID:13
main方法一直没有结束,说明线程池还保留着,当将其中的注释去掉可以看到打印结果在1s后会显示结束标识
插入5个核心线程
第0个核心线程:1515052978417:Thread ID:11
第1个核心线程:1515052978417:Thread ID:12
第3个核心线程:1515052978417:Thread ID:14
第4个核心线程:1515052978417:Thread ID:15
第2个核心线程:1515052978418:Thread ID:13
Process finished with exit code 0
BlockingQueue<Runnable> workQueue : 创建线程池的第五个参数,标识线程进入时的排队策略,当提交一个新的任务到线程池以后,线程池会根据当前池子中正在运行的线程数量,采用以下几种策略:
借用http://blog.csdn.net/clevergump/article/details/50688008中的说明:
(1) 直接切换. 常用的队列是 SynchronousQueue (同步队列). 这种队列内部不会存储元素. 每一次插入操作都会先进入阻塞状态, 一直等到另一个线程执行了删除操作, 然后该插入操作才会执行. 同样地, 每一次删除操作也都会先进入阻塞状态, 一直等到另一个线程执行了插入操作, 然后该删除操作才会执行. 当提交一个任务到包含这种 SynchronousQueue 队列的线程池以后, 线程池会去检测是否有可用的空闲线程来执行该任务, 如果没有就直接新建一个线程来执行该任务而不是将该任务先暂存在队列中. “直接切换”的意思就是, 处理方式由”将任务暂时存入队列”直接切换为”新建一个线程来处理该任务”. 这种策略适合用来处理多个有相互依赖关系的任务, 因为该策略可以避免这些任务因一个没有及时处理而导致依赖于该任务的其他任务也不能及时处理而造成的锁定效果. 因为这种策略的目的是要让几乎每一个新提交的任务都能得到立即处理, 所以这种策略通常要求最大线程数 maximumPoolSizes 是无界的(即: Integer.MAX_VALUE). 静态工厂方法 Executors.newCachedThreadPool() 使用了这个队列。
**(2) 使用无界队列 **(也就是不预设队列的容量, 队列将使用 Integer.MAX_VALUE 作为其默认容量, 例如: 基于链表的阻塞队列 LinkedBlockingQueue). 使用无界队列将使得线程池中能够创建的最大线程数就等于核心线程数 corePoolSize, 这样线程池的 maximumPoolSize 的数值起不到任何作用. 如果向这种线程池中提交一个新任务时发现所有核心线程都处于运行状态, 那么该任务将被放入无界队列中等待处理. 当要处理的多个任务之间没有任何相互依赖关系时, 就适合使用这种队列策略来处理这些任务. 静态工厂方法 Executors.newFixedThreadPool() 使用了这个队列。
(3) 使用有界队列 (例如: 基于数组的阻塞队列 ArrayBlockingQueue). 当要求线程池的最大线程数 maximumPoolSizes 要限定在某个值以内时, 线程池使用有界队列能够降低资源的消耗, 但这也使得线程池对线程的调控变得更加困难. 因为队列容量和线程池容量都是有限的值, 要想使线程处理任务的吞吐量能够在一个相对合理的范围内, 同时又能使线程调配的难度相对较低, 并且又尽可能节省系统资源的消耗, 那么就需要合理地调配这两个数值. 通常来说, 设置较大的队列容量和较小的线程池容量, 能够降低系统资源的消耗(包括CPU的使用率, 操作系统资源的消耗, 上下文环境切换的开销等), 但却会降低线程处理任务的吞吐量. 如果发现提交的任务经常频繁地发生阻塞的情况, 那么你就可以考虑增大线程池的容量, 可以通过调用 setMaximumPoolSize() 方法来重新设定线程池的容量. 而设置较小的队列容量时, 通常需要将线程池的容量设置大一点, 这种情况下, CPU的使用率会相对较高, 当然如果线程池的容量设置过大的话, 可能会有非常非常多的线程来同时处理提交来的多个任务, 并发数过大时, 线程之间的调度将会是个非常严峻的问题, 这反而有可能降低任务处理的吞吐量, 出现过犹不及的局面.
ThreadFactory threadFactory : 主要是创建线程所用的线程工程,一般使用默认的Executors.defaultThreadFactory就可以了,还有另一个PrivilegedThreadFactory,用这个工厂创建的新线程与当前线程具有相同的权限。
RejectedExecutionHandler handler:拒绝策略,当线程池和队列中已经被新线程填满时,线程池将不再接收新的线程了,拒绝的时候采用的策略主要有一下几种:
(1)AbortPolicy:拒绝时直接抛异常
(2)CallerRunsPolicy:将新提交的任务放在 ThreadPoolExecutor.execute()方法所在的那个线程中执行.
(3)直接不执行新提交的任务.
(4)DiscardOldestPolicy:
① 当线程池已经关闭 (SHUTDOWN) 时, 就不执行这个任务了, 这也是 DiscardPolicy 的处理方式.
② 当线程池未关闭时, 会将阻塞队列中处于队首 (head) 的那个任务从队列中移除, 然后再将这个新提交的任务加入到该阻塞队列的队尾 (tail) 等待执行.
下面是部分源码解析:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//初始化时private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//由此可得ctl = 111 00000 00000000 00000000 00000000
int c = ctl.get();
//根据private static int workerCountOf(int c) { return c & CAPACITY; }
//private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//则:CAPACITY = 000 11111 11111111 11111111 11111111
// workerCountOf(c) 前三位与0做与操作,均为0,后29位与1做与操作,则是其真实值
//第一次执行时,workerCountOf(c) = 000 00000 00000000 00000000 00000000
if (workerCountOf(c) < corePoolSize) {
//当线程数小于corePoolSize时,直接加入创建worker对象执行线程
if (addWorker(command, true))
return;
c = ctl.get();
}
//当线程池当前运行状态为Runnable时,将新线程插入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//二次检查,判断是否是运行状态
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//队列已满时,添加新线程为非核心线程
else if (!addWorker(command, false))
reject(command);
}