javaweb

Java8线程池——底层为LinkedBlockingQueue

2020-09-23  本文已影响0人  雁阵惊寒_zhn

线程池使用的一个例子

这里约定,细节在代码中注释说明,代码外正文的描述文字展示程序的大体流程。

int nThreads = 10;
ExecutorService exec = Executors.newFixedThreadPool(nThreads);
Runnable task = new Runnable(){
    public void run(){
        //do something
    }
};
exec.execute(task);

这里的工厂方法newFixedThreadPool()初始化的是ThreadPoolExecutor实例,在线程池实现的工厂方法newSingleThreadExecutor()与newFixedThreadPool()一样构建底层为LinkedBlockingQueue的ThreadPoolExecutor。而newCachedThreadPool()初始化的是底层为SynchronousQueue的ThreadPoolExecutor实例。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>());
}

ThreadPoolExecutor的execute()

任务到来执行execute()方法。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     * 执行过程分为三步:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 1. 如果少于corePoolSize条线程在运行,那么试图启动一条新的线程,并且把当前的命令作   
     *  为这条线程将要执行的第一个任务。addWorker()方法原子性检查runState和workerCount
     *  的状态,以防止不应该增加线程时添加了线程。
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 2. 如果一个任务成功入队,我们需要进行双重的校验,校验我们是否应该增加一个线程(因
     * 为从上次校验之后存在的线程可能已经死掉了)或者检查线程池是否关闭了。所以需要再次
     * 检验,如果需要进行回滚出队。或者,如果没有线程,就开始一条新的线程。
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     *
     * 3. 如果任务入队不成功,我们试图增加新的进程。仍旧失败的话,直接拒绝执行任务。
     */
    int c = ctl.get();
    //①
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //②
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //③
    else if (!addWorker(command, false))
        reject(command);
}

上面的execute()方法的源代码中注释的标号,对应着上面的英文注释,中文翻译在对应英文下面。先大体看流程,execute()方法执行新的任务,当一个任务到来后,做必要的校验,根据检验的不同结果,执行不同的逻辑实现。但是任务的执行都是通过addWorker()方法完成的,下面流程走的addWorker()方法中

ThreadPoolExecutor的addWorker()

addWorker()方法代码片段

private boolean addWorker(Runnable firstTask, boolean core) {
    ....省略一些校验逻辑.....
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);//①
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;//②
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                       (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();//③
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

代码片段中有三点重要之处,分别标注了序号。

  1. 到来的任务,被封装在一个Worker的对象中。Worker对象字段thread赋值给变量t;
  2. 对线程池状态的参数设置和进一步的最终校验需要加锁,保证并发时的线程安全,和数据的一致性。
  3. Thread变量t执行start()方法。

梳理上面的代码片段,到来的任务会被先封装Worker对象,然后Worker对象返回一个Thread对象,这个Thread对象最终执行。下面走到Worker类的源代码。

ThreadPoolExecutor的Worker类

//Worker的构造函数
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

构造Worker对象时,Runnable任务作为当前线程的第一个任务赋值到firstTask,如果确实是第一个任务firstTask不为null,否则为null,然后去队列中取已经入队的任务。字段thread指向通过ThreadPoolExecutor中定义的线程工厂,生产的封装了Worker对象自己的线程。字段thread指向的对象执行start()方法(上面addWorker()方法代码中的③)时,底层调用的就是Worker对象的run()方法。

Worker类的run()方法

run()方法调用的是runWorker()方法。

public void run() {
    runWorker(this);
}

Worker类的runWorker()方法

在runWorker()方法最重要的语句就是那一条while语句(下面代码中标识①的语句)。涉及如下三点:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {//①
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                (Thread.interrupted() &&
                 runStateAtLeast(ctl.get(), STOP))) &&
               !wt.isInterrupted())
               wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
           }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

自此,完成了任务到来,提交给线程池,线程池创建线程或者复用等待中的线程执行任务。复用线程可以减少创建和销毁线程产生的开销,在高并发系统中线程池使用可以取得很好的性能效果。更多细节可以参考Java线程池ThreadPoolExecutor的实现和参数

上一篇 下一篇

猜你喜欢

热点阅读