ThreadPoolExecutor源码解析

2019-10-03  本文已影响0人  橙味菌

线程池 ThreadPoolExecutor

通过使用线程池减少创建和销毁线程开销,线程池命令执行器,将提交的命令交给线程池中的线程处理

作用 API
创建线程池 ThreadPoolExecutor(核心线程数corePoolSize,
最大线程数maximumPoolSize,
空闲线程最大存活时间keepAliveTime,
存活时间的单位unit,
存放任务的队列workQueue,
创建线程的工厂threadFactory
被拒绝任务的处理程序handler)
设置核心线程是否可超时 allowCoreThreadTimeOut(boolean value)
返回命令队列 BlockingQueue<Runnable> getQueue()
移除任务 remove(Runnable task)
清除已取消的命令 purge()
获取线程池大小 getPoolSize()
获取积极线程数 getActiveCount
获取线程池最大容量 getLargestPoolSize
获取命令数 getTaskCount
获取完成命令数 getCompletedTaskCount
设置自定义Thread工厂 setThreadFactory(ThreadFactory threadFactory)
状态码
Running -1 << COUNT_BITS
ShutDown 0 << COUNT_BITS
Stop 1 << COUNT_BITS
Tidying 2 << COUNT_BITS
Terminated 3 << COUNT_BITS
拒绝执行处理器子类
CallerRunsPolicy 调用所在线程执行命令
AbortPolicy 抛出异常
DiscardPolicy 丢弃此命令
DiscardOldestPolicy 丢弃队列里最近的一个命令,执行此命令

继承关系

ThreadPoolExecutor继承关系

Executor接口

命令执行器接口

定义execute(Runnable)方法(此方法在将来执行提交的命令)

ExecutorService接口

命令执行器服务接口

定义了更多的控制执行提交命令的方法

作用 API
关闭,停止提交命令 shutdown
关闭,停止执行、等待和提交命令<br />返回正在执行的命令集合 shutdownNow
阻塞终止,停止提交命令,等待已提交命令执行完成 awaitTermination(long timeout, TimeUnit unit)
是否已关闭 isShutdown
是否已终止 isTerminated
提交命令,返回执行期望Future submit(Callable<T> task)
提交一组Callable命令,返回一组执行结果T invokeAllinvokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
提交一组Callable命令,返回第一个成功执行结果T invokeAnyinvokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

AbstractExecutorService

ExecutorService的骨架类,简单实现了submit、invokeAll等方法

//调用execute执行
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
//遍历调用execute方法执行并获取期望Future,遍历Future阻塞等待执行结果
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                try {
                    f.get();
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}
    

执行期望Future接口

代表异步计算的执行结果

作用 API
取消执行 cancel
阻塞获取结果 get
是否已取消 isCanneled
是否已完成 isDone

FutureTask

实现了RunnableFuture接口(顾名思义,此接口继承了Runnable和Future两个接口)
Future接口 可以查看异步计算的结果
提供了cancel、isCanceled、isDone、get方法
Callable接口 执行的任务需实现此接口
提供call方法,有一个泛型返回值

源码解析

并发器——Worker内部类

继承自AQS,代表一个工作线程节点,由多个提交命令的线程进行获取,获取到的线程可执行命令

//创建工作线程节点,并发器状态-1,创建新线程
Worker(Runnable firstTask) {
    setState(-1); 
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

//独占式尝试获取资源——state由0变1,设置当前线程独占并发器
protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}
//独占式尝试释放资源——state设置为0,取消独占
protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

执行命令——execute方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //ctl是原子整型,代表线程池状态码
    int c = ctl.get();
    //当前线程数小于核心线程容量,添加核心线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //线程池运行中且工作队列接受命令,workQueue是BlockedQueue<Runnable>
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //再次确认发现线程池停止运行且无法移除命令——拒绝命令
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //线程数为0,添加空命令工作线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //添加空命令工作线程,无法添加工作线程则拒绝命令
    else if (!addWorker(command, false))
        reject(command);
}

添加工作线程节点

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池非运行中(<shutDown的状态码只有Running),且未关闭工作线程或队列不为空——失败
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        //增加线程池工作线程数量的数字
        for (;;) {
            int wc = workerCountOf(c);
            //工作线程数达到容量或添加的是核心线程且核心工作线程数达到核心线程容量——失败
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //增加并成功更新节点数——跳出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  
            //线程池状态变动——再次进入外循环查看工作线程队列是否为空
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
     //尝试插入工作线程
    try {
       //创建新工作线程节点
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //上锁
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //workers是一个HashSet,代表了线程池所有工作线程
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动工作线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

工作线程运行

public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //工作节点释放资源——并发锁state设置为0
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        //获取命令(未设置则从workQueue中获取)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //Stop及以上状态——中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //执行前置
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //工作线程结束后置
        processWorkerExit(w, completedAbruptly);
    }
}

从命令队列获取一个命令——getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池状态Stop以上或Shutdown且命令队列为空——减少工作线程数
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        //是否有时限——核心线程是否有时限||工作线程数超过核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //有时限则进行
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

关闭执行器——shutDown方法

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //确认权限——modifyThread
        checkShutdownAccess();
        //设置线程池状态为Shutdowm
        advanceRunState(SHUTDOWN);
        //终止工作节点
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //状态达到Tidying或命令队列为空即可返回
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //试图将状态切换为Tidying
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    //termination是mainLock的ConditionObject
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
上一篇下一篇

猜你喜欢

热点阅读