程序员Java 基础知识

JAVA 基础 之 线程池

2019-10-10  本文已影响0人  蓉漂里的小白

在执行一个多线程应用程序时创建多个线程,如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率。
因为线程的切换,创建和销毁都需要花费额外的开销。所以就推出了线程池的方式,把创建的线程都交给统一的集合去管理,使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务

1: ThreadPoolExecutor 类介绍

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {

线程池共提供了这4个构造函数中可以看出ThreadPoolExecutor最重要的7个参数,corePoolSize, maxinumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
Handler.

1.1 corePoolSize

核心线程池大小,线程池创建时线程池中的线程数量是0 ,当有任务进入到线程池后,就创建一个线程来执行任务,直到核心线程数量达到corepoolSize数量后,就将新的任务投放到队列中

1.2 maximumPoolSize

线程池允许创建的最大线程数,
线程池中的线程数量取决于cpu的数据和应用程序的需求场景
1:多线程在应用程序中主要是用来做计算和逻辑判断的
线程数 = cpu 数量 + 1
2:多线程在应用程序中主要是用来做io操作的
线程数 = cpu 核数 * (1 + 平均等待时间 / 平均工作时间)

1.3 workQueue

任务缓冲队列,用来存放等待执行的任务
1:有界的任务队列ArrayBlockingQueue,基于数组实现的先进先出队列,创建时需要指定队列长度
2:无界的任务队列LinkedBlockingQueue,基于链表实现的先进先出队列,不需要指定队列长度,默认为Integer.MAX_VALUE;
3:直接提交队列synchronousQueue,它不会保存提交的任务到内存中,直接新建一个线程来执行新到达的任务

1.4 keepAliveTime

表示一个线程最多空闲多长时间,当某个线程空闲时间达到了keepAliveTime时就会被shutdown,这个参数只有在线程池中线程数大于corePoolSize时候才会生效

1.5 unit

参数keepAliveTime的时间单位,(microsecond,millisecond,second,minutes,hours,days)

1.6  handler:

拒绝新任务时的策略
1:AbortPolicy, 直接丢弃新任务并抛出RejctException
2:DiscardPolicy, 直接丢弃新任务,不抛出异常
3:DiscardOldestPolicy,丢弃任务队列中最老的一个任务(队列头的任务), 再将新的任务提交到任务队列中
4:CallerRunsPolicy,当新任务添加到线程池被拒绝时,线程池会将被拒绝的新任务添加到线程池正在运行的线程”中去运行

1.7 ThreadFactory

用于定制化生产线程,如设置一个更有意义的线程 名,还可以设定线程的优先级,高优先级的线程更容易得到调度。 设置线程是前台还是后台线程

2: 线程池的执行原理

image.png

2.1 ThreadPoolExecutor状态

线程池内部维护了一个AtomicInteger的变量,是一个int整型,共32位,用高3位来记录线程池状态,用低29位来记录线程的数量,所以一个线程池最大可容纳(2^29 -1)个任务

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

running:运行状态
terminated: 所有工作线程已经销毁,任务缓存队列已经清空或执行结束
shutdown: 不能接受新的任务但会等待,现有任务都执行完毕
stop: 不接受新的任务,并且会终止现在正在执行的任务
ctlOf是一个打包操作,ThreadPoolExecutor将线程池的状态和线程数量大包成一个值保存到ctl中。
runStateOf和workerCountOf是2个拆包操作,通过ctl中的值来计算线程池运行状态和线程数量

2.2 ThreadPoolExecutor的执行

最核心的任务提交方法是execute()方法,submit()也可以提交任务并且可以获取线程执行的返回值,但submit方法里面最终调用的还是execute()方法,
Submit 内部会创建一个新的eRunnableFuture,再调用execute(RunnableFuture)
Submit函数源码

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

execute()方法源码

int c = ctl.get(); //获取ctl中的Value
//计算线程池中的线程数量,如果小于核心线程数量,创建核心线程并执行
if (workerCountOf(c) < corePoolSize) { 
    if (addWorker(command, true)) 
        return;
    c = ctl.get();
}
//如果创建核心线程失败,判断线程池是否是工作状态,并尝试添加到工作队列中
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
/*
 * 如果执行到这里,有两种情况:
 * 1. 线程池已经不是RUNNING状态;
 * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
 * 这时线程池就会创建临时线程来执行新任务
 * 如果失败则拒绝该任务
 */

else if (!addWorker(command, false))
    reject(command);

addWorker部分源码

private boolean addWorker(Runnable firstTask, boolean core) {
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //线程池状态为shutdonw或shutdownnow 并且
         // firstTask不null或者workqueue为空 则不需要创建新的线程
        if (rs >= SHUTDOWN &&
        !(rs == SHUTDOWN &&firstTask == null && !workQueue.isEmpty()))
            return false;

        for (;;) {
//计算线程池中的线程数量
            int wc = workerCountOf(c);
//线程池中的线程数量大于线程池最大值或者大于设置的核心线程数/最大线程数        
if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
//创建新的线程
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
//创建worker对象,执行新的任务
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
。。。。。
。。。。

3: 使用示例

测试代码和测试结果

 //使用AbortPolicy reject策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2 ,10,
        TimeUnit.MINUTES,
        new ArrayBlockingQueue<>(3),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i=0;i<6;i++){
    ThreadResource task = new ThreadResource(i+1);
    //Executes the given task sometime in the future
    executor.execute(task);

    System.out.println("投放第:"+ (i+1) +"个任务到线程池中");
    System.out.println("线程池中的线程数量:" + executor.getPoolSize());
    System.out.println("队列中的任务数量:" + executor.getQueue().size());
    System.out.println("已经执行完的任务数目:" + executor.getCompletedTaskCount());
    System.out.println("#######################");
}

执行结果的部分内容如下:

投放第:6个任务到线程池中
线程池中的线程数量:2
队列中的任务数量:3
已经执行完的任务数目:0
#######################
正在执行任务ID:5的线程是:pool-1-thread-2
\color{red}{任务ID:5执行完成},执行线程是:pool-1-thread-2
正在执行任务ID:3的线程是:pool-1-thread-2
\color{red}{任务ID:1执行完成},执行线程是:pool-1-thread-1
正在执行任务ID:4的线程是:pool-1-thread-1
\color{red}{任务ID:3执行完成},执行线程是:pool-1-thread-2
\color{red}{任务ID:4执行完成},执行线程是:pool-1-thread-1
正在执行任务ID:6的线程是:pool-1-thread-1
\color{red}{任务ID:6执行完成,}执行线程是:pool-1-thread-1

第一个任务到来,创建核心线程执行,第2,3,4任务到来存入队列中,第5个任务到来创建临时线程执行,第6个任务到来,发现核心线程在工作,队列满了,临时线程也在工作,所以执行reject策略,丢掉队列中的第一个任务(任务2)将任务6放入任务2的位置。

上一篇 下一篇

猜你喜欢

热点阅读