线程池

线程池ThreadPoolExecutor

2017-03-23  本文已影响36人  丁木木木木木

主要内容

前言

说到创建线程大家首先会想到调用线程的start()来创建,但如果在并发环境下,且线程执行时间都较短的话,这样频繁地创建和销毁线程会降低系统的效率
这时候就需要工具来统一对线程进行分配、监控以及管理。java1.5后引入的Executor线程池可以达到这样的效果,将任务的定义和执行解耦,定义好线程提交给线程池后,就不用管它怎么执行。接下来主要讲解ThreadPoolExecutor

ThreadPoolExecutor

调用初始化线程池方法时发现除了newScheduledThreadPool特殊以外,其他方法内部都是基于ThreadPoolExecutor实现的,例如创建固定数量的线程池newFixedThreadPool、创建单个线程newSingleThreadExecutor等,代码举例如下。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

因此先来了解下核心类ThreadPoolExecutor

继承关系

void execute(Runnable command);
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);//新建FutureTask
        execute(ftask);
        return ftask;
    }

线程池原理

线程池状态

   /** 
      ctl变量设计得很巧妙,用这个AtomicInteger来表示workerCount和runState,
      其中runState占高3位,后29位为workerCount的值。 
      workerCount:当前活动的线程数; 
      runState:线程池的当前状态。 
    */  
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //COUNT_BITS表示workerCount占的位数,29位
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //CAPACITY表示的是workderCount能使用的最大个数,值是0001...1(29个1)
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    /** 
      下面五个值表示的是线程池的状态,存储在高3位。
      可以直接用<=>大于来比较状态,running<shutdown<stop<tidying<terminated 
    */ 
    //高三位是111。初始时线程处于这个状态,能接受新任务,且处理阻塞队列中的任务。
    private static final int RUNNING    = -1 << COUNT_BITS;

    //高三位是000。不接受新的任务,但会处理完阻塞队列中的任务。
    private static final int SHUTDOWN   =  0 << COUNT_BITS;

    //高三位是001。不接受新的任务,也不处理阻塞队列中的任务,会去尝试中断正在执行的任务。
    private static final int STOP       =  1 << COUNT_BITS;

    //高三位是010。所有任务都被终止了,且当前线程数为0,转到这个状态的线程即将执行terminated()的钩子方法。
    private static final int TIDYING    =  2 << COUNT_BITS;

    //高三位是011。terminated()执行结束。
    private static final int TERMINATED =  3 << COUNT_BITS;

    /** 
      这个方法用于取出runState线程池当前状态的值,传入的参数为ctl。 
      ~为按位取反操作,则~CAPACITY值1110...0(29个0), 
      然后做&操作,低29位置为0,得到了高3位就是runState的值
    */ 
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

    /** 
      这个方法用于取出workerCount线程池当前线程个数,传入的参数为ctl。 
      CAPACITY值0001...1(29个1), 
      然后做&操作,高3位置为0,得到了低29位就是workerCount的值
    */ 
    private static int workerCountOf(int c)  { return c & CAPACITY; }

    /** 
      这个方法用于将runState和workerCount组装成一个值,
      传入的参数rs其实就是runState、wc是workerCount。 
    */ 
    private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池中状态转换

线程池的创建

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

解释下各个参数。

任务提交

ThreadPoolExecutor类中核心的任务提交方法是executor方法。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();

        //如果当前线程数小于corePoolSize的话,增加线程来执行新传入的任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))//添加新线程,true表示要检测当前线程数量小于corePoolSize
                return;//创建成功,终止方法的执行
            c = ctl.get();//新线程创建失败,记录当前runState和workerCount
        }
        //当前线程如果是运行状态,并且成功插入到缓冲队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//线程池不是运行状态,且成功从缓冲队列中删除任务
                reject(command);//使用线程池指定的饱和策略处理任务
            else if (workerCountOf(recheck) == 0)//当前活动线程为0
                addWorker(null, false);//添加新线程,新建线程对应的任务为null
        }
        /** 
          当前线程池不是运行状态;
          当前线程池是运行状态,但缓冲队列已经满了不能再添加新任务。
          满足任意一个条件尝试着新建线程。
        */
        else if (!addWorker(command, false))//新建线程失败,这里的false表示要检测当前线程数量小于maximumPoolSize
            reject(command);//使用线程池指定的饱和策略处理任务
    }

提交任务过程中有个重要的方法addWorker,接下来详细介绍这个方法。

    /**
     * 根据当前线程池状态以及参数来校验是否能新增线程,新增线程
     * @param firstTask 这个新增线程的第一个任务
     * @param core true表示新增时判断当前活动线程是否小于corePoolSize;false与maximumPoolSize判断
     * @return true表示成功新增一个线程,false创建线程失败
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();//获取当前ctl变量值
            int rs = runStateOf(c);//当前线程池状态

            /** 
              等价于rs>=SHUTDOWN && (rs!=SHUTDOWN || firstTask!=null || workQueue.isEmpty())
            */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);//当前线程个数
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//使用CAS将当前线程个数加一
                    break retry;//成功的话跳出循环,进入创建新线程的过程
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)//如果线程池状态改变,则进入外循环重新迭代
                    continue retry;
                //因为当前线程个数改变导致的失败,则迭代将当前线程个数加一的操作
            }
        }

        //重头戏来了!!
        //接下来是创建新线程的过程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    int c = ctl.get();//再次进行状态的校验
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        workers.add(w);//新创建的线程加入到workers(HashSet)中
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);//创建线程失败
        }
        return workerStarted;
    }

未完。。。

上一篇下一篇

猜你喜欢

热点阅读