线程池 ThreadPoolExecutor

2019-05-12  本文已影响0人  Wi1ls努力努力再努力
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

参数说明

参数 说明
corePoolSize 核心线程数,也指线程池的基本大小。当任务提交时,即时有其他空闲线程也会创建一个新线程执行任务,直到线程数等于 corePoolSize
maximumpoolSize 线程池最大数量。如果队列满并且创建的线程数小于最大线程数,则会创建新的现场执行任务。对于无界队列无用
keepAliveTime 工作线程活动保持时间。当工作线程空闲后,在 keepAliveTime 内没有重新执行任务,则会被销毁
unit keepAliveTime的单位
workQueue 任务队列,用以保存等待执行的任务的阻塞队列
threadFactory 先创创建工厂
handler 饱和策略,当队列和线程池满后,说明线程池处于饱和状态。

常见的线程池,用线程池工具类 Executors 创建

线程池 corePoolSize maximumPoolSize keepAliveTime unit workQueue threadFactory handler
FixedThreadPool 自定义 等于 corePoolSize 0 MILLISECONDS LinkedBlockingQueue Executors$DefaultThreadFactory AbortPolicy
SingleThreadExecutor 1 1 0 MILLISECONDS LinkedBlockingQueue 自定义或Executors$DefaultThreadFactory AbortPolicy
CachedThreadPool 0 Integer.MAX_ VALUE 60 SECONDS SynchronousQueue 自定义或Executors$DefaultThreadFactory AbortPolicy

默认的构造函数

@Executors
public static ExecutorService newFixThreadPool(int nThreads);
public static ExecutorService newFixThreadPool(int  nThreads, ThreadFactory threadFactory)

public static ExecutorService newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor(ThreadFactory factory);

public static ExecutorService newCachedThreadPool();
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

FixThread 介绍:
从参数构造可以看出来 FixThread 内的线程全都是核心线程,使用的队列是无界阻塞队列 LinkedBlockingQueue,因此 FixThread 会从无界队列中反复获取任务交由核心线程执行


SingleThreadExecutor:
其核心线程数和最大线程数被设置为 1,任务队列使用无界队列 LinkedBlockingQueue,因此该线程池会反复从 LinkedBlockingQueue 中获取任务来执行,每次执行一个


CachedThreadPool:
其核心线程数为 0,最大线程数为Integer.MAX_VALUE,并且 keepAliveTime 设置为 60 秒,使用的是没有容量的 SynchronousQueue,因此每提交一个任务,如果没有多余的线程可以使用,那么每次都会创建一个线程去执行任务。于是乎如果任务的提交速度远大于任务的执行速度,那么会因为创建过多线程而耗尽 CPU。


线程池:
无论是 FixThread 还是 SingleThreadExecutor 还是 CacherThreadPool,其内部都是ThreadPoolExecutor对象,参数介绍在上面已经说明了。当执行一个任务时,需要调用 execute(Runnable)@ThreadPoolExecutor

public void execute(Runnable command){
    if(command == null){
        throw new NullPointException();
    }
        //默认值是 e0000000
    int c= ctl.get();
        //从 workerCountOf(c)返回 c&1ffffffff,因此线程最大为 0x1fffffff
    if(workerCountOf(c) < corePoolSize){
                //当线程数小于 corePoolSize,则
        if(addWorker(command, true)){
            c = ctl.get();
        }
    }
        //如果线程池是 Running 的并且任务入队成功
    if(isRunning(c) && workQueue.offer(command)){
        int recheck = ctl.get();
                //如果线程池非 Running 并且任务出队成功,则执行拒绝策略
        if(!isRunning(recheck) && remove(command)){
            reject(command);
        }else if(workerCountOf(recheck) == 0){
                        //
            addWorker(null,false);
        }
    }else if(!addWorker(command, flase)){
        reject(command);
    }
}

看一些参数说明

  //初始值是 e0000000,最高位表示线程池状态,低 7 位表示线程数,
  //如 e 代表 RUNNING,0 代表 SHUTDOWN,2代表 STOP,4代表 TIDYING,6代表 TERMINATED
  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //1fffffff
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    0xe0000000
    private static final int RUNNING    = -1 << COUNT_BITS;
    0x0
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    0x20000000
    private static final int STOP       =  1 << COUNT_BITS;
    0x40000000
    private static final int TIDYING    =  2 << COUNT_BITS;
    0x6000000
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
private boolean addWorker(Runnable firstTask, boolean core){
    retry:
    for(;;){
        int c= ctl.get();
        int rs=runStateOf(c);

        //runstateOf()是做 mask 操作,取出线程池状态
        //mask是 e00000000,
        //SHUTWODN是非 RUNNING状态的最小值,如果>=SHUTDOWN表示线程池非 RUNNING
        if(rs >= SHUTDOWN &&
            !(rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty())){
            return false;
        }

        for(;;){
            int wc = wokerCountOf(c);
            //线程池当前线程数量大于线程池容量 或
            //如果是用核心线程执行且大于核心线程数阈值
            //或是用非核心线程执行且大于非核心线程数阈值
            if(wc >= CAPACITY
                || wc>=(core? corePoolSize : maximumPoolSize)){
                return false;
            }
            //CAS 增加线程池线程数,则否重试
            if(compareAndIncrementWorkerCount(c)){
                break retry;
            }
            //? re-check
            c = ctl.get();
            if(runStateOf(c) != rs){
                continue retry;
            }
        }
    }
    //以下是为了增加工作线程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try{
        //将 Runnable 封装为 Worker,内部用线程工厂类创建线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t!=null){
            final RenntrantLock mainLock = this.mainLock;
            //锁
            mainLock.lock();
            try{
                int rs = runStateOf(ctl.get());

                if(rs < SHUTDOWN
                    ||(rs == SHUTDOWN && firstTask == null)){
                    if(t.isAlive()){
                        throw new IllegalThreadStateException();
                    }
                    workers.add(w);
                    int s = workers.size();
                    if(s>largestPoolSize){
                        largestPoolSize = s;
                    }
                    workerAdded = true;
                }
            }finally{
                main.unlock();
            }
            if(workerAdded){
                                //开始工作
                t.start();
                workerStarted = true;
            }
        }
    }finally{
        if(!workStarted){
            addWorkerFailed(w);
        }
    }
    return workerStated;
}
@ThreadPoolExecutor$Worker
Worker(Runnable firstTask){
  setState(-1);
  this.firstTask = firstTask;
  this.thread=getThreadFactory().newThread(this);
}
//当在 addWorker()中调用 Worker.thread.start()则会调用 run()@Worker
public void run(){
  runWorker(this);
}

final void runWorker(Worker w){
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try{
        //循环取任务并且执行
        while(task !=null || (task = getTask) !=null){
            w.lock();
            if((runStateAtLeast(ctl.get(),STOP)
                ||(Thread.interrupted() && 
                runStateAtLeast(ctl.get(),STOP)))
                &&!wt.isInterrupted()){
                wt.interrupt();
            }
            try{
                beforeExecute(wt, task);
                Throwable thrown = null;
                try{
                    task.run();
                }catch(Exception x){
                    throw x;
                }finally {
                    afterExecute(task, thrown);
                }
            }finally{
                task = null
                w.completedTasks++;
                w.unlock;
            }
        }   
        completedAbruptly = false;
    }finally{
        processWorkerExit(w, completedAbruptly);
    }
}
//从任务队列取出任务
private Runnable getTask(){
    boolean timeOut = false;
    for(;;){
        int c = ctl.get();
        int rs = runStateOf(c);

        if(rs>= SHUTDOWN && (rs >= STOP)||workQueue.isEmpty()){
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if((wc>maximumPoolSize ||(timed && timedOut))
            &&(wc > 1 || workQueue.isEmpty())){
                if(compareAndDecrementWorkerCount(c)){
                    return null;
                }
                continue;
        }

        try{
            Runnable r  = timed ?
                workQueue.pool(keepALiveTime, TimeUnit.NANOSECOND):
                workQueue.take();
            if(r !=null){
                return r;
            }
            timeOut = true;
        }catch(InterruptedException retry){
            timeOut = false;
        }
    }
}

。。。

上一篇下一篇

猜你喜欢

热点阅读