Java 杂谈互联网科技Java

十分钟深入理解Java线程池源码及设计原理

2019-07-14  本文已影响15人  Java_苏先生

一、说明

下面如果有贴出源码,对应的源码是JDK8

主要的源码类

java.util.concurrent.ThreadPoolExecutor、
java.util.concurrent.ThreadPoolExecutor.Worker
java.util.concurrent.AbstractExecutorService

1. 类继承图

二、线程池的状态

三、源码分析

1. 完整的线程池构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

2. ctl

内部有重要的成员变量ctl,类型是AtomicInteger,低29位表示线程池中线程数,通过高3位表示线程池的运行状态

COUNT_BITS的值是29

3. 任务的执行

execute --> addWorker --> Thread.start --> (Thread.run) --> runTask --> getTask

3.1execute(Runnable command)

大致分三个步骤

3.2 addWorker(Runnable firstTask, boolean core)

3.3 runWorker(Worker w)

通过循环调用getTask()获取要执行的任务task
beforeExecute
task.run()
afterExecute

3.4 getTask()

源码如下:

private Runnable getTask() {
    Boolean timedOut = false;
    // 是否最后的 poll() 超时了?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        Boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // worker是否需要被淘汰
        if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
            // 这里会让线程的数量记录减,后面的return null,会导致runWorker没有获取到数据而让run()方法走到尽头,最终当前线程结束
            if (compareAndDecrementWorkerCount(c))
                            return null;
            continue;
        }
        try {
            // 如果需要回收一部分线程,那么超时时间keepAliveTime后拿不到就数据就继续循环调用,就可以在下一次循环的时候进行线程结束回收了;否则一直阻塞下去
            Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                            workQueue.take();
            if (r != null)
                            return r;
            timedOut = true;
        }
        catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

四、任务执行,带返回值的

源码如下:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

代码比较简单,把任务封装成一个既实现Runnable, 也实现Future<v style="margin: 0px; padding: 0px;">的接口,这个时候就可以调用execute()进行实现了</v>

写在最后

上一篇 下一篇

猜你喜欢

热点阅读