Java资料整理java杂谈线程池

ThreadPoolExecutor源码解析

2016-12-12  本文已影响956人  miaoLoveCode

引言


为什么引入线程池技术?

对于服务端的程序,经常面对的是执行时间较短、工作内容较为单一的任务,需要服务端快速处理并返回接口。假若服务端每次接收到一个任务,就创建一个线程,然后执行,这种方式在原型阶段是不错的选择,但是面对成千上万的任务提交进服务器时,这个时候将会创建数以万记的线程,这很明显不是一个好的选择。为什么呢?

线程池技术很好的解决了这个问题,它预先创建一定数量的线程,用户不能直接控制线程的创建和销毁,重复使用固定或者较为固定数目的线程来完成任务的执行。这样做的好处:


ThreadPoolExcutor源码解析

在看具体的源码之前,先给一个线程池使用案例

线程池使用案例
  1. 创建线程池对象;
  2. executor.submit(Runnable task)提交10个任务;
  3. executor.submit(Callable<T> task)提交5个任务;
  4. 所有线程的管理都由线程池来原理,程序员不需要关注线程的创建销毁。

构造方法

public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize, 
                          long keepAliveTime, 
                          TimeUnit unit, 
                          BlockingQueue<Runnable> workQueue, 
                          RejectedExecutionHandler handler) {    
  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}

核心参数:

线程池内部状态

线程池内部状态

线程池用ctl的低29位表示线程池中的线程数,高3位表示当前线程状态,后续假如想要增大这个值,可以将AtomicInteger改成AtomicLong。

接下来就以submit方法入手,分析一下相关源码。

submit任务提交

public Future<?> submit(Runnable task) {
  //提交的task为null,抛出空指针异常   
  if (task == null)
       throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    //执行任务
    execute(ftask);
    return ftask;
}

整个任务的提交核心都在任务执行这部分,执行任务,拿到返回值。

任务执行execute
public void execute(Runnable command) {
  if (command == null)
     throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
  } else if (!addWorker(command, false))
      reject(command);
}

具体的执行流程如下:

总结一下,对于使用线程池的外部来说,线程池的机制是这样的:

  1. 如果正在运行的线程数 < coreSize,马上创建线程执行该task,不排队等待;
  2. 如果正在运行的线程数 >= coreSize,把该task放入队列;
  3. 如果队列已满 && 正在运行的线程数 < maximumPoolSize,创建新的线程执行该task;
  4. 如果队列已满 && 正在运行的线程数 >= maximumPoolSize,线程池调用handler的reject方法拒绝本次提交。

addWorker实现

从全局来看,ThreadPoolExcutor一定维护一个池:

workers

addWorker的实质是向该HashSet里add一个worker,worker有一个线程,这个线程执行完成时,会从该HashSet里remove掉。

看一下addWorker的具体代码实现:

addWorker前半部分

这只是addWorker的前半部分代码,首先,判断当前线程的状态是否符合条件,不符合条件不做处理直接返回;通过参数core判断当前线程是否为核心线程,如果是核心线程,跳出循环创建新的线程来执行该task,在break retry的时候会执行compareAndIncrementWorkerCount(c),利用CAS原则,将线程数量+1。

看看创建线程部分(addWorker的后半部分)代码实现:

创建线程部分代码

创建线程部分最核心的操作就是:new一个新的worker,add进HashSet,然后启动woker里的Thread。
从源码可以看到,在执行add之前先活取了mainLock锁,该锁是一个公用的可重入锁:

private final ReentrantLock mainLock = new ReentrantLock();

addWorker的4种调用方式

  1. addWorker(command, true)
    线程数 < coreSize时,将task放入workers,如果线程数 >= coreSize,返回false;
  2. addWorker(command, false)
    当阻塞对列已满,尝试将新的task放入workers,如果线程数 >= maximumPoolSize,返回false;
  3. addWorker(null, false)
    放入一个空的task到workers,此时线程数的限制是maximumPoolSize,相当于创建一个新的线程,没立马分配任务;
  4. addWorker(null, true)
    放入一个空的task到workers,线程数 < coreSize。实际的使用是在prestartCoreThread()等方法,有兴趣的读者可以自行阅读,在此不做详细赘述。

Worker具体实现

在addWorker中,t.start()使线程就绪,我们来看看Worker类的具体设计。

addWoker中的t.start()实质上是执行Worker的run()方法:

public void run() {
    runWorker(this);
}

run方法主要干了一件事,调用runWorker(this),接下来我们来看看runWorker的具体实现。

runWorker具体实现

runWoker具体实现
  1. 线程启动后,释放锁,设AQS状态为0;
  2. 获取firstTask任务并执行,执行任务前后可定制beforeExecuteafterExecute;
  3. 如果worker自己的task为null,调用getTask从阻塞队列获取等待任务执行,否则,阻塞该方法。

getTask具体实现

private Runnable getTask() {
    boolean timedOut = false; 
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //必要情况下需要检查workQueue是否为空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        //如果线程池允许线程超时或者当前线程数大于核心线程数,则会进行超时处理
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { 
            if (compareAndDecrementWorkerCount(c))
                  return null;
            continue;
        }
        try {
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null)
                return r; 
           timedOut = true;
        } catch (InterruptedException retry) { 
           timedOut = false;
        }
    }
}

整个getTask循环实现:

上一篇下一篇

猜你喜欢

热点阅读