java线程池核心方法

2018-03-29  本文已影响0人  何甜甜在吗

与线程池相关的核心类为ThreadPoolExecutor,但是在实际应用中,我们并不会调用这个类来使用线程池,而是使用Executors,Executors类封装了生成几种不同线程池的方法。

线程池的用法

ExecutorService es = Executors.newCachedThreadPool();
es.execute(Runnable run);

所以从execute(Runnable command)方法讲起,这个方法会将任务提交给一个线程进行执行
execute(Runnable command)方法源码:

//调用execute方法会将线程提交到线程池中  
 public void execute(Runnable command) {
        //如果任务为null,则抛出异常 
        if (command == null)
            throw new NullPointerException();
    
        //1.如果当前线程数量小于corePoolSize的大小
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {  
            //addWorker方法会进一步进行检查,因为有可能其他线程对线程池的状态做了改变
            //addWorker方法中进行两个检查1)线程池的状态 2)当前线程数量是否超过corePoolSize的大小
            //检查完成以后如果都否合要求,创建一个Worker,new Worker(Runnable r),如果条件满足,会将这个worker加入到HashSet<Worker>中去,并且返回true
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //2.如果corePoolSize已经满了,则需要加入到阻塞队列
        //会进行一个判断,线程池的状态以及是否可以往阻塞队列中继续添加runnable
        if (isRunning(c) && workQueue.offer(command)) {  
            int recheck = ctl.get();
            //在进行一次判断,这个判断主要是为了有其他线程调用了shutDown或者shutDownNow方法,这时候如果再有任务就会拒绝执行
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果此时队列已满,则会采取相应的拒绝策略
        //addWorker中第二参数boolean core,如果false,则边界为maxmunSize,如果为true,则边界为corePoolSize
        else if (!addWorker(command, false))
            reject(command);
    }

execute(Runnable command)方法战略三步走
1.往corePoolSize中加入任务进行执行
2.当corePoolSize满时往阻塞队列中加入任务
3.阻塞队列满时并且maximumPoolSize已满,则采取相应的拒绝策略

因为execute(Runnable command)方法没有加锁,所以做了很多相同的判断,因为很有可能这个线程在执行execute方法时有其他线程已经完成了execute方法并且改变了线程池的状态(比如可能因为其他线程的执行导致corePoolSize已满,或者其他线程调用了shutDown()或者shutDownNow()方法拒绝在接受新任务)
可以看出execute(Runnable command)方法中最核心的方法是addWorker(Runnable firstTask, boolean core)
先看addWorker(Runnable firstTask, boolean core)方法的源码:

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //返回线程池的状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //打破死循环的关键
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //为什么叫addWorker方法,从这里就可以看出,创建了一个Worker对象,并且Runnable是其里面的一个字段
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //正真执行到往corePoolSize添加任务时会进行一个加锁操作
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //在加锁之后还会进行一个判断
                    //判断1.线程池是否在running 2.任务是否被移除
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //往corePoolSize中加入任务
                        workers.add(w);
                        int s = workers.size();
                        //调整曾经线程池拥有最大线程数的大小
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //改变状态
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //运行该任务,t.start方法最终会调用native void start0()方法 
                    t.start();
                    //改变状态
                    workerStarted = true;
                }
            }
        } finally {
            //如果加入失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

从代码中看出其实corePoolSize中就是维护了一个HashSet<Worker>
并且在真正往corePoolSize加入任务时会进行一个加锁,加完锁还会做一个验证
很多细节还待以后继续补充,现在学习源码的功力还不够

上一篇下一篇

猜你喜欢

热点阅读