线程池原理

2018-05-08  本文已影响9人  gczxbb

线程池,关注如何缩短频繁线程创建和销毁的时间,通过线程复用技术,减少非核心任务的时间损耗(创建和销毁的时间),提高程序性能。它的主要原理是采用阻塞任务队列实现线程复用的方案。ThreadPoolExecutor是线程池的具体实现类,它的继承关系图。

ThreadPoolExecutor继承关系图.jpg

线程池类型

源码提供了三种类型的线程池,通过不同的参数设置,缓存线程池、单线程线程池、固定数量线程的线程池。ThreadPoolExecutor的构造方法。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

这几个参数决定线程池类型,核心参数。
corePoolSize,核心线程数。
maximumPoolSize,线程池的允许的最大线程,超过会报异常。
keepAliveTime,非核心线程允许存留时间,(保持活跃性的时间)。
TimeUnit,时间度量参数。
BlockingQueue<Runnable>,任务队列,可以使配置成无限或有限队列或栈。
ThreadFactory,线程工厂,创建线程优先级以及统计线程数量等。

线程池的对任务的控制流程

  • 线程数量<corePoolSize,新建线程,处理任务。
  • 线程数量>=corePoolSize,将任务放入workQueue队列,若有核心线程空闲,从workQueue队列取任务处理。
  • 当任务队列有限且已满,再次新建线程处理任务,这时,要保证总量不超过最大允许值,否则,导致RejectedExecutionHandler异常。
  • 再次新建的线程是非核心线程,空闲时最大存留keepAliveTime时间。

三种线程池是Executors的静态方法创建。

缓存线程池
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

核心线程数是0,允许的最大线程无限,任务队列SynchronousQueue不做具体存储,线程活跃时间60秒。
每个任务先到SynchronousQueue队列,它其实一个管道,不保存,若不存在空闲线程则新建,新建线程数量永远不会超过允许的最大值。
若一开始并发任务较多,会创建不少线程,每个线程任务完成后,变空闲线程,空闲时间未达到60s,可重用空闲线程处理新进任务,线程最大数量不限。即可以保障任务繁重时,空闲线程可复用,又能办证在没有任务时,保持一定时间后消失。

单线程线程池
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

核心线程是1,允许的最大线程是1,任务队列LinkedBlockingQueue,不设空闲时间,无界队列。
只有一个核心线程处理任务,新任务入队列。若队列是有界的,多并发任务状态下队列总有满的时候,若队列满了就得新建临时线程,肯定会超允许的最大线程,报异常,因此,队列必须无界。

固定数量线程的线程池

Executors#newFixedThreadPool方法创建固定数量线程的线程池。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory 
                    threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
}

核心线程与最大线程数量自己配置,任务队列LinkedBlockingQueue,不设空闲时间,无界队列。
任务队列无界,因此,不会创建临时工线程,只有核心线程工作,keepAlieveTime无需设置。

综上所述

允许的最大线程数量与队列必须保证一个是无界的,否则,在高并发条件下导致异常。根据特定的业务场景灵活配置。


工作原理

当线程池ThreadPoolExecutor创建后,我们一般调用它的execute方法,向线程池派发任务,将任务交给线程池,不需要自己再管理,会自动执行。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {//小于核心线程
        if (addWorker(command, true))//创建核心线程
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {//加入队列
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))//队列满了,创建非核心临时线程
        reject(command);
}

首先,workerCountOf(c)方法,返回工作线程数量,若小于核心线程数量,调用addWorker方法,创建核心线程(设标志位参数),该任务将由新建的核心线程处理。
然后,若工作线程数量>=核心线程,isRunning(c)方法,表示c<0,将任务加入队列,offer方法,加入后返回成功标志。
因为offer是非阻塞方法,也就是说,如果队列已满,将直接返回失败,这时,将调用addWorker方法,创建非核心线程(不设标志位参数)。该任务将由新建的非核心线程处理。

以下是上面代码用到的一些变量与方法。
//AtomicInteger确保高并发整型值自增时线程安全。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//32-3=29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//536870911

private static final int RUNNING    = -1 << COUNT_BITS;//-536870912
private static final int SHUTDOWN   =  0 << COUNT_BITS;//0
private static final int STOP       =  1 << COUNT_BITS;//536870912
private static final int TIDYING    =  2 << COUNT_BITS;//1073741824
private static final int TERMINATED =  3 << COUNT_BITS;//1610612736

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

ctl的初始值与RUNNING相同,值是-536870912,即11100000 00000000 00000000 00000000。
-1存储的补码是1111...(32个1),向左移动29位,即右边加29个0。

workerCountOf方法将c与CAPACITY进行与操作。
runStateOf方法将c与CAPACITY的取反进行与操作。

CAPACITY是536870911,即00011111 11111111 11111111 11111111。
CAPACITY的取反是11100000 00000000 00000000 00000000。

workerCountOf方法,第一次调用时,ctl初始值与CAPACITY与操作结果是0,若ctl不断自增,与CAPACITY操作的值不断自增1,工作线程初始是0,每增加一个工作线程,ctl自增一次。
runStateOf方法,第一次调用时,ctl初始值与反CAPACITY与操作结果为反CAPACITY。
总之,workerCountOf方法返回工作线程数量,runStateOf方法返回ctl的初始值-536870912。
五个运行状态,RUNNING状态<0,isRunning方法,判断c是否<0,每次新建线程后c值自增1,c初始值-536870912,一般情况下,不足以使达到c>=0,因此,isRunning(c)方法一般返回true。

下面分析一下addWorker创建线程方法。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        ...
        for (;;) {
            int wc = workerCountOf(c);//工作线程数量
            if (wc >= CAPACITY ||// 已经等于最大值。
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建线程,绑定任务。
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();//上锁
            try {
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);//任务放入集合
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; //后续根据此标志启动线程。
                }
            } finally {
                mainLock.unlock();//解锁
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

两层死循环,若工作线程>CAPACITY容量,或>=允许的最大值(创建核心线程>=核心线程数量),返回失败。自增ctl,跳出循环。创建一个Worker对象,构造方法。

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

它是Runnable类型,封装派发给线程池的Runnable任务,ThreadFactory的newThread方法,创建一个新线程,将Worker类作为任务主体。该类的结构关系图。

线程池的Worker任务关系图.jpg 我们再回到addWorker方法,runStateOf(ctl.get())<0时(一般<0),将Worker任务放入HashSet集合,设置workerAdded标志。然后,根据该标志,调用Thread的start方法,启动线程,设置启动标志workerStarted,表示线程已启动执行任务。最后,返回启动标志。
新线程执行Worker任务,触发Worker#run方法。该方法调用runWorker方法。它是外部类ThreadPoolExecutor的方法,入参就是该Worker。
final void runWorker(Worker w) {//处理Worker内的派发任务,在循环中进一步访问任务队列。
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            ....
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();//执行任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

该方法中,新建线程借助Worker任务,开始为线程池工作,内部Runnable是用户需要完成的任务。在while循环中,第一个优先处理execute派送,Worker内部的线程池任务,完成后,线程也不会结束,而是getTask方法,继续从任务队列中获取务,如果getTask返回空,结束线程,否则,继续执行,该方法可能会阻塞。

private Runnable getTask() {
    boolean timedOut = false; 
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        ...//队列空时,返回null。
        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

如果工作线程数量wc>核心线程,设置timed标志。从队列取任务采用阻塞等待一定时间的poll方式,等待时间设置线程存活时间keepAliveTime,因此,即使队列没有任务,线程仍然存活,(保证任务来到可立即开始工作)。
如果当工作线程数量wc<核心线程,采用一直阻塞的take方案,队列是空时线程一直阻塞,核心线程不会死亡。
如果队列是空,poll时间已到,设置timeOut超时标志,进入下次循环,这时,如果再发生工作线程数量wc>核心线程,会使得timed和timedOut标志同时存在,此时,工作线程数量自减,返回空,退出while循环,线程结束。如果工作线程数量wc<核心线程(仅有核心线程),工作线程不会自减,for循环继续,阻塞在take查询任务。
如果设置核心线程TimeOut,也会采用poll方式,存活时间一到,队列无任务,即使wc数量<核心线程,线程也会退出,允许核心线程死亡。
每一个新建的线程在该方法的执行逻辑是相同。根据当前线程数量和超时标志决定从任务队列的获取方法是否阻塞。


总结

线程池的本质,每一个线程在完成派发任务后,并未结束,继续访问任务队列。根据当前线程数量,利用任务队列的阻塞特性,实现线程的存留时间。通过留存工作线程消费,不新建线程,实现任务的线程复用。


任重而道远

上一篇下一篇

猜你喜欢

热点阅读