Java WorldJavaIT相关

线程池工作机制与原理

2016-11-10  本文已影响1580人  安静点就睡吧

书接上文,<a href="http://www.jianshu.com/p/aa5884bcd032">Java线程池</a>。
接下来记录一下线程池的工作机制和原理

线程池的两个核心队列:

线程池的核心参数:

线程池的运行机制:
举个栗子。假如有一个工厂,工厂里面有10个人,每个工人同时只能做一件事情。因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;当10个工人都有任务时,如果还来任务,就把任务进行排队等待。
如果说新任务数目增长的速度远远大于工作做任务的速度,那么此时工厂的主管可能就需要采取补救措施了,比如重新招4个工人进来;然后就将任务分配给这4个刚招进来的工人处理。
如果说这14个工人做任务的速度还是不够,此时工厂主管就要考虑不再接受新的任务或者抛弃前面的一些任务了。当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管就要考虑辞掉4个临时工了,只保持原来10个工人,比较额外的工人是需要花费的。
而这个栗子中永远等待干活的10个工人机制就是workerQueue。这个栗子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。也就是说corePoolSize就是线程池的大小,maximumPoolSize在我看来就是一种线程池任务超过负荷的一种补救措施,即任务量突然过大时的一种补救措施。再看看下面图好好理解一下。工人永远在等待干活,就像workerQueue永远在循环干活一样,除非,整个线程池停止了。

线程池原理图

线程池里面的线程的时序图如下图所示:

线程的时序图

自定义线程池与ExecutorService

自定义线程池需要用到ThreadFactory,本节将通过创建一个线程的例子对ExecutorService及其参数进行详细讲解。

1.认识ExecutorService家族

ExecutorService家族成员如下所示:

ExecutorService家族

使用startUML画的,我是UML菜鸟,所以凑合着看下。

上图中主要元素说明如下:
Executor:线程池的顶级接口,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。
ExecutorService:真正线程池接口。这个接口继承了Executor接口,并声明了一些方法:
submit、invokeAll、invokeAny以及shutDown等。
AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法。
ThreadPoolExecutor:ExecutorService的默认实现,继承了类AbstractExecutorService。
ScheduledExecutorService:与Timer/TimerTask类似,解决那些需要任务重复执行的问题。
ScheduledThreadPoolExecutor:继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。
Executors是个线程工厂类,方便我们快速地创建线程池。

2.利用ThreadFactory创建一个线程

java.util.concurrent.ThreadFactory提供了一个创建线程的工厂的接口。
ThreadFactory源码如下:

public interface ThreadFactory{
  @override
  public Thread newThread(Runnable r);
}

我们可以看到上面的接口类中有一个newThread()的方法,为此我们自己手动定义一个线程工厂类,有木有激动啊,呵呵,下面我们就手动写一个自己的线程工厂类吧!
MyThreadFactory.java

public class MyThreadFactory implements ThreadFactory{
  @Override
  public Thread newThread(Runnable r){
        return new Thread(r);
  }
}

上面已经创建好了我们自己的线程工厂类,但是啥都没有做,就是直接new了一个Thread就返回回去了,我们一般在创建线程的时候,都需要定义其线程的名字,因为我们在定义了线程的名字之后就能在出现问题的时候根据监视工具来查找错误的来源,所以我们来看下官方实现的ThreadFactory吧!
这个类在java.util.concurrent.Executors类中的静态类中DefaultThreadFactory

/**
*  The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory{
  private static final AtomicInteger poolNumber=new AtomicInteger(1);
  private final ThreadGroup group;
  private final AtomicInteger threadNumber=new AtomicInteger(1);
  private final String namePrefix;

  DefaultThreadFactory(){
    SecurityManager s=System.getSecurityManager();
    group=(s!=null)?s.getThreadGroup():Thread.currentThread().getThreadGroup();
    namePrefix="pool-"+poolNumber.getAndIncrement()+"-thread-";
  }
  public Thread newThread(Runnable r){
      Thread t=new Thread(group,r,namePrefix+threadNumber.getAndIncrement(),0);
      if((t.isDaemon())
          t.setDaemon(false);
      if(t.getPriority()!=Thread.NORM_PRIORITY)
          t.setPriority(Thread.NORM_PRIORITY);
      return t;
  }
}

3.了解线程池的拒绝策略(RejectExecutionHandler)

当调用ThreadPoolExecutor的execute方法时,而此时线程池处于一个饱和的状态,并且任务队列也已经满了那么就需要做丢弃处理,RejectExecutionHandler就是这样的一个处理接口类。
RejectExecutionHandler.java

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

在JDK里面有4中拒绝策略,如下图所示:

线程池拒绝策略

来看下源码吧:
AbortPolicy : 一言不合就抛异常的

   /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always.
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

CallerRunsPolicy:调用者所在线程来运行任务

    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

DiscardOldestPolicy :丢弃队列里面最近的一个任务,并执行当前任务

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

DiscardPolicy : 不处理,直接丢弃

/**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

思考问题:
为什么有任务拒绝的情况发生呢:
这里先假设有一个前提:线程池里面有一个任务队列,用于缓存所有待处理的任务,正在处理的任务将从任务队列中移除。因此,在任务队列长度有限的情况下,就会出现现任务的拒绝情况,需要一种策略来处理发生这种已满无法加入的情况。另外,在线程池关闭的时候,也需要对任务加入队列操作进行额外的协调处理。

4.ThreadPoolExecutor详解

ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要想透彻的了解Java线程池,必须先了解这个大BOSS,下面来看下其源码:
4种构造方法:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                              BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                             BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

通过源码我们清楚的看到,最终构造函数调用了最后一个构造函数,后面的那个构造函数才是真正的构造函数,接下来研究一下参数。

所以想自定义线程池就可以从上面的几个参数入手。接下来具体看下代码,了解一下实现原理:

   // 默认异常处理机制
   private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
   //任务缓存队列,用来存放等待执行的任务
   private final BlockingQueue<Runnable> workQueue;
   //线程池的主要状态锁,对线程状态(比如线程大小、runState等)的改变都需要这个锁
   private final ReentrantLock mainLock = new ReentrantLock();
   //用来存放工作集
   private final HashSet<Worker> workers = new HashSet<Worker>();
   //volatile 可变变量关键字,写的时候用mainLock做锁,读的时候无锁,高性能
   private volatile long keepAliveTime;
   //是否允许核心线程超时
   private volatile boolean allowCoreThreadTimeOut;
   //核心线程数量
   private volatile int corePoolSize;
   //线程最大线程数量
   private volatile int maximumPoolSize;
   //任务拒绝策略
   private volatile RejectedExcutionHandler handler;

结合之前的知识,大概就能猜出里面是怎么实现的了,具体可以参考一下JDK的源代码,这样我们就能做到了解原理又会用了。

5.自定义实现一个简单的Web请求连接池

我们来自定义一个简单的Web请求线程池。模仿Web服务的需求场景说明如下:

public class MyExecutors extends Executors{
    //利用默认线程工厂和PriorityBlockingQueue队列机制,当然了,我们还可以自定义ThreadFactory和继承queue进行自定义扩展
   public static ExecutorService newMyWebThreadPool(int minSpareThreads,int maxThreads,int maxIdleTime){
    return new ThreadPoolExecutor(minSpareThread,maxThreads,maxIdleTime,TimeUnit.MILLISECONDS,
          new PriorityBlockingQueue<Runnable>());
  }
}

6.线程池在工作中的错误使用

上一篇下一篇

猜你喜欢

热点阅读