线程池

2021-10-09  本文已影响0人  lingfighting

线程池的使用和源码

相关api

创建线程池

向线程池中添加任务

调用ThreadPoolExecutor类的submit方法,给线程添加任务。submit的参数是一个函数式接口,调用方式:

Future<String> result = executorService.submit(new Runnable() {
    public void run() {
        .......
    }
});

lambda表达式允许你通过表达式来代替功能接口,同时Lambda表达式只能针对函数式接口(当接口里只有一个抽象方法的时候,就是函数式接口,可以使用@FunctionalInterfac注解限定,可以有其他方法,不在这里将)使用

结果获取

future.get();

线程池源码

设计到的相关类:

image

线程池状态转变

其中AtomicInteger变量ctl:低29位表示线程池中线程数,高3位表示线程池的运行状态:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //29
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 000 1111111111111111111111111111

// runState is stored in the high-order bits
// -1的二进制1111111111111111
private static final int RUNNING    = -1 << COUNT_BITS;// 111 00000000000000000000000000000 即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
private static final int SHUTDOWN   =  0 << COUNT_BITS;//  000 00000000000000000000000000000 即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
private static final int STOP       =  1 << COUNT_BITS;// 001 00000000000000000000000000000 即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
private static final int TIDYING    =  2 << COUNT_BITS;// 010 00000000000000000000000000000 即高3位为010,  所有的任务都已经终止;
private static final int TERMINATED =  3 << COUNT_BITS;//011 00000000000000000000000000000 即高3位为011,  terminated()方法已经执行完成 

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~COUNT_MASK; } // ~COUNT_MASK = 111 00000000000000000000000000000。最新版本没有使用
private static int workerCountOf(int c)  { return c & COUNT_MASK; } // COUNT_MASK = 000 1111111111111111111111111111
private static int ctlOf(int rs, int wc) { return rs | wc; }

任务执行的主要流程

线程池.jpg

主要函数

submit(Callable<T> task)方法

AbstractExecutorService.submit(task)实现了ExecutorService.submit(task)

execute –> addWorker –>runWorker (getTask)

execute(Runnable command)方法

ThreadPoolExecutor.execute(task)实现了Executor.execute(task)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1\. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2\. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3\. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {  
    //workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建新线程执行command任务
       if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // double check: c, recheck
    // 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行reject方法处理任务
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //线程池处于running状态,但是没有线程,则创建线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 往线程池中创建新的线程失败,则reject任务
    else if (!addWorker(command, false))
        reject(command);
}

为什么需要double check线程池的状态? 在多线程环境下,线程池的状态时刻在变化,而ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将command加入workque是线程池之前的状态。倘若没有double check,万一线程池处于非running状态(在多线程环境下很有可能发生),那么command永远不会执行。

addWorker()方法 addWorker主要负责,创建新的线程并执行任务。 线程池创建新线程执行任务时,需要 获取全局锁:

private final ReentrantLock mainLock = new ReentrantLock();

private boolean addWorker(Runnable firstTask, boolean core) {
       // CAS更新线程池数量
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)  // SHUTDOWN不会接收新任务,但会处理阻塞队列中的任务
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 线程池重入锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get(); // 之前的版本用的rs = runStateOf(c); 一下使用的rs做判断

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); // workers主要用于线程池工作线程的统计和线程池状态变更时判断使用
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();  // 线程启动,执行任务(Worker.thread(firstTask).start());
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker类的runworker方法

private final class Worker
         extends AbstractQueuedSynchronizer
         implements Runnable{
     Worker(Runnable firstTask) {
         setState(-1); // inhibit interrupts until runWorker
         this.firstTask = firstTask;
         this.thread = getThreadFactory().newThread(this); // 创建线程
     }
     /** Delegates main run loop to outer runWorker  */
     public void run() {
         runWorker(this);
     }
     // ...
 }

 runWorker方法是线程池的核心: 
1. 线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行可中断; 
2. Worker执行firstTask或从workQueue中获取任务: 
2.1 进行加锁操作,保证thread不被其他线程中断(除非线程池被中断) 
2.2 检查线程池状态,倘若线程池处于中断状态,当前线程将中断。 
2.3 执行beforeExecute 
2.4 执行任务的run方法 
2.5 执行afterExecute方法 
2.6 解锁操作

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 先执行firstTask,再从workerQueue中取task(getTask())
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask方法

通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();

       // Check if queue empty only if necessary.
       if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
       }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

keepAliveTime的作用:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();

allowCoreThreadTimeOut为false,线程即使空闲也不会被销毁;倘若为ture,在keepAliveTime内仍空闲则会被销毁。

提交任务

image

Callable负责产生结果,Future负责获取结果。

submit(Callable<T> task)方法

// submit()在ExecutorService中的定义
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

// submit方法在AbstractExecutorService中的实现
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // newTaskFor返回一个FutureTask对象(class FutureTask<V> implements RunnableFuture<V>)
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;

FutureTask对象 内部状态:

    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        // this.state 是 volatile,对 volatile 字段的写入,存在一个 happen-before
        // 关系;也就是说,`this.state = NEW` 执行完毕时,`this.callable = callable`
        // 也保证已经写入
        this.state = NEW;       // ensure visibility of callable
    }

run方法

public void run() {
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

set方法和setException方法

    // NEW -> COMPLETING -> NORMAL
    protected void set(V v) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            outcome = v;
            STATE.setRelease(this, NORMAL); // final state
            finishCompletion();
        }
    }

    // NEW -> COMPLETING -> EXCEPTIONAL
    protected void setException(Throwable t) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            outcome = t;
            STATE.setRelease(this, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

get方法

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

内部通过awaitDone方法对主线程进行阻塞,具体实现如下:

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            // 判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成或被取消,则直接返回;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 如果当前state等于COMPLETING,说明任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            // 如果主线程被中断,则抛出中断异常
            else if (Thread.interrupted()) {
                // 把节点从 waiters 列表里移除
                removeWaiter(q);
                throw new InterruptedException();
            }
            // 第一个循环,q == null,进入此处的判断证明 Callable 还未完成,所以会创建等待节点
            else if (q == null) {
                // 此处的 timed 传入为 false,不会在此返回
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
                // 第二个循环会执行下面这个语句,把 q 入队。将上一个判断条件中新建的 q 加入到链表的首节点中,并且 queued 变成 true
                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    // 挂起线程
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                // 挂起线程
                LockSupport.park(this);
        }
    }

finishCompletion方法:当run方法里面,任务执行完成,LockSupport.unpark唤醒线程。当线程被释放后,那么在awaitDone的死循环中就会进入下一个循环,由于状态已经变成了NORMAL或者EXCEPTIONAL,将会直接跳出循环

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (WAITERS.weakCompareAndSet(this, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }
上一篇 下一篇

猜你喜欢

热点阅读