技术杂谈

Java线程池分析

2019-02-16  本文已影响1人  香芋牛奶面包

引言

在并发编程中,我们经常会使用到线程池,当然我们也可以手动一个一个创建线程,那么为何我们还是推崇大家使用线程池进行并发编程呢?借用《Java并发编程的艺术》提到的来说一下使用线程池的优点有3个

ThreadPoolExecutor类

在日常使用中,大多数情况我们都会使用JDK提供的Executors去创建线程池,Executors利用工厂模式向我们提供了4种线程池实现方式,但是最新的阿里Java开发手册中明确说明了不建议使用Executors去创建线程池,原因是使用Executors创建线程池会使用很多默认值,默认使用的参数有时候是不合理,但是开发者往往会忽略。所以我们应该尽量使用ThreadPoolExecutor类来显示的创建线程池。

下面我们先来看下ThreadPoolExecutor类的继承结构

image.png
public interface ExecutorService extends Executor {
   void shutdown();
   boolean isShutdown();
   boolean isTerminated();
   boolean awaitTermination(long timeout, TimeUnit unit)
       throws InterruptedException;
   <T> Future<T> submit(Callable<T> task);
   <T> Future<T> submit(Runnable task, T result);
   Future<?> submit(Runnable task);
   <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
       throws InterruptedException;
   <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                 long timeout, TimeUnit unit)
       throws InterruptedException;

   <T> T invokeAny(Collection<? extends Callable<T>> tasks)
       throws InterruptedException, ExecutionException;
   <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                   long timeout, TimeUnit unit)
       throws InterruptedException, ExecutionException, TimeoutException;
}

我们直接看ThreadPoolExecutor所提供的构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

现在我们来解释一下各个参数的含义

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

下面再简单再介绍一下ThreadPoolExecutor中几个重要的方法

通过源码分析 ThreadPoolExecutor

上文我们简单的分析了ThreadPoolExecutor,下面我们将根据源码来进一步分析ThreadPoolExecutor核心功能的实现

先从其核心方法execute开始解读

public void execute(Runnable command) {
      // 如果传入空对象 则抛出空指针异常
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    
    /**
     * 如果当前正在执行任务的线程数小于 corePoolSize
     */
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    /**
     * 如果当前正在执行任务的线程数大于corePoolSize,且workQueue未满
     */
    if (isRunning(c) && workQueue.offer(command)) {
            // 重复检查一次,防止正好此时线程池被shutdown
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /**
     * 如果当前正在执行的任务线程数大于corePoolSize,且workQueue已满,则使用        * maximumPoolSize参数创建线程,如果已经到了maximumPoolSize最大值,则启用拒绝      * 策略
     */
    else if (!addWorker(command, false))
        reject(command);
}

上面的代码简单总结一下就是

  1. 当前正在执行任务的线程数小于corePoolSize时,正常加入执行
  2. 当前正在执行任务的线程数大于corePoolSize,且workQueue未满时,则将任务加入等待队列workQueue
  3. 如果当前正在执行的任务线程数大于corePoolSize,且workQueue已满,则会临时扩充线程数,根据maximumPoolSize最大线程数值
  4. 如果当前正在执行的任务线程数大于maximumPoolSize 这时已经超出最大可承受的线程数值了,会启用拒绝策略,也就是上文所配置的四种策略之一,默认是抛出异常,加入任务失败

workQueue 任务队列

上文有提到任务队列,也就是在等待执行的任务队列。workQueue的本质是一种阻塞队列BlockingQueue<Runnable>,列举三种常用类型

  1. 基于数组的先进先出队列,此队列创建时必须指定大小
  2. 基于链表的先进先出队列
  3. 同步队列 该队列不存储元素,每个插入操作必须等待另一个线程调用移除操作,否则插入操作会一直阻塞

核心功能分析

上文中我们通过查看源码知道了新任务加入线程池的策略,下面我们继续往下看,再分析之前,我们可以先思考几个问题

  1. 任务等待队列是在何时被执行?
  2. 线程是如何实现重复利用的?,毕竟这是线程池最重要的功能了
  3. 空闲线程根据keepAliveTime参数是在哪里被回收的?

接下来我们继续通过源码来解读这三个问题
根据上文execute方法源码,可以看到调用了addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 这里判断当前线程数是否大于最大值,大于了则返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
            // 新建了一个 Worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 把当前这个工作线程加入到 workers 集合中
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                  // 开始执行此worker
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

经过上面的分析,可能大概会有疑惑,Worker对象是做啥的?下面我们就来看下Worker类的实现

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    
    ...
}

重点在runWorker方法中

线程池重复利用机制

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 获取第一个任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
            // 从任务队列中循环获取任务执行 getTask方法有可能会阻塞
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                  // 空实现,可以通过继承`ThreadPoolExecutor`重写此方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                        // 执行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

代码解析到这里,基本可以回答上文所提出的三个问题了

getTask 超时策略

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // workers大于corePoolSize,或则允许corePoolSize设置空闲超时时间
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            // 当前线程数已经大于maximumPoolSize或则已经超时过一次,则直接返回null
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
                 // 获取任务,如果timed为true,则等待一定时间(keepAliveTime)未返回的话,会返回null,如果timed未设置为true,则会一直阻塞,直到有数据
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

总结一下,只有当我们设置allowCoreThreadTimeOut(核心池线程空闲超时时间)或则当前线程数大于corePoolSize时,keepAliveTime机制才会生效

整个流程到这里大概就分析完了,下图基本绘制了线程池提交任务,执行任务的整个流程

image.png

Executors 中几个常用的线程池

虽然阿里Java开发规约中不建议我们使用Executors类直接创建线程池,但还是有必要简单介绍几个其中常用的方法

总结

本篇文章主要从对线程池的配置使用,以及源码实现做了分析,总体上比较全面的分析了Java线程池的实现。

博客原文地址戳这里

上一篇 下一篇

猜你喜欢

热点阅读