JAVA基础知识Java学习笔记

详解Java中ThreadPool的使用

2017-07-04  本文已影响210人  Acamy丶

线程池就是限制系统中执行线程的数量。我们可以根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

为什么要用线程池:

1.线程池继承体系

如下图所示,Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。

2.Executors

要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。

2.1 newSingleThreadExecutor

创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

源码:

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

实例

public classMyThread extends Thread {
    @Override
    publicvoid run() {
        System.out.println(Thread.currentThread().getName() + "正在执行。。。");
    }
}
publicclassTestSingleThreadExecutor {
    publicstaticvoid main(String[] args) {
        //创建一个可重用固定线程数的线程池
        ExecutorService pool = Executors. newSingleThreadExecutor();
        //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        Thread t3 = new MyThread();
        Thread t4 = new MyThread();
        Thread t5 = new MyThread();
        //将线程放入池中进行执行
        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);
        //关闭线程池
        pool.shutdown();
    }
}

运行结果:

pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
2.2 newFixedThreadPool

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

源码:

   public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

实例

publicclass TestFixedThreadPool {
    publicstaticvoid main(String[] args) {
        //创建一个可重用固定线程数的线程池
        ExecutorService pool = Executors.newFixedThreadPool(2);
        //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        Thread t3 = new MyThread();
        Thread t4 = new MyThread();
        Thread t5 = new MyThread();
        //将线程放入池中进行执行
        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);
        //关闭线程池
        pool.shutdown();
    }
}

运行结果:

pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
pool-1-thread-1正在执行。。。
2.3 newCachedThreadPool

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

源码:

   public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

实例

publicclass TestCachedThreadPool {
    publicstaticvoid main(String[] args) {
        //创建一个可重用固定线程数的线程池
        ExecutorService pool = Executors.newCachedThreadPool();
        //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        Thread t3 = new MyThread();
        Thread t4 = new MyThread();
        Thread t5 = new MyThread();
        //将线程放入池中进行执行
        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);
        //关闭线程池
        pool.shutdown();
    }
}

运行结果:


pool-1-thread-2正在执行。。。
pool-1-thread-4正在执行。。。
pool-1-thread-3正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-5正在执行。。。
2.4 newScheduledThreadPool

创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

源码:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

实例

publicclass TestScheduledThreadPoolExecutor {
    publicstaticvoid main(String[] args) {
        ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);
        exec.scheduleAtFixedRate(new Runnable() {//每隔一段时间就触发异常
                      @Override
                      publicvoid run() {
                           //throw new RuntimeException();
                           System.out.println("================");
                      }
                  }, 1000, 5000, TimeUnit.MILLISECONDS);
        exec.scheduleAtFixedRate(new Runnable() {//每隔一段时间打印系统时间,证明两者是互不影响的
                      @Override
                      publicvoid run() {
                           System.out.println(System.nanoTime());
                      }
                  }, 1000, 2000, TimeUnit.MILLISECONDS);
    }
}

运行结果:

================
8384644549516
8386643829034
8388643830710
================
8390643851383
8392643879319
8400643939383

3.ThreadPoolExecutor

从上面Executors类创建几种线程池的源码中可以发现,底层调用了ThreadPoolExecutor类的构造方法。下面就具体的看一看这个类的相关源码。

3.1 构造方法
 /**
     * 用给定的参数创建一个线程池
     *
     * @param corePoolSize 池中所保存的线程数,包括空闲线程。
     * @param maximumPoolSize 池中允许的最大线程数。
     * @param keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线 程等待新任务的最长时间。
     * @param unit 参数的时间单位。
     * @param workQueue 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的Runnable任务。
     * @param threadFactory 执行程序创建新线程时使用的工厂。
     * @param handler 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
3.2.execute方法

execute方法里面的代码逻辑对应着用户提交一个线程时的执行流程,如下流程图。

源码:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
3.3 BlockingQueue<Runnable> 参数

线程池的实现,队列起着至关重要的作用。它也是ThreadPoolExecutor构造方法里面的参数之间。从上面的excute执行逻辑知道当核心线程已满,如果等待队列未满,我们就可以把线程任务放到队列中去,当线程池中有空闲线程就会执行该任务。

3.3.1 类图:
ArrayBlockingQueue--声明时就确定大小的队列,fifo方式。(方法基本和接口一致,没有特别要说明的内容)
LinkedBlockingQueue--链表实现的queue-remove效率会高一些
PriorityBlockingQueue--优先级队列
SynchronousQueue--阻塞队列,必须拿走一个才能放进来一个,也就是最多只有一个~
DelayQuque--就是放进去的内容,延迟时间到了后才可以获得
LinkedBlockDeque--双端队列 :offerFirst/offerLast,pollFirst/pollLast
LinkedTransferQueue--类似LinkedUnBlockedQueue,其实就是transfer方法有人再等待队列内容就直接给他这个元素,没人在等就放在队列里面。也就是效率会更高。
3.3.2 排队的三种通用策略:
3.3.3 BlockingQueue的选择。
new ThreadPoolExecutor(
  2, 3, 30, TimeUnit.SECONDS,
  new SynchronousQueue<Runnable>(),
  new RecorderThreadFactory("CookieRecorderPool"),
  new ThreadPoolExecutor.CallerRunsPolicy());

当核心线程已经有2个正在运行.
此时继续来了一个任务(A),根据前面介绍的“如果运行的线程等于或多于 corePoolSize,则 Executor始终首选将请求加入队列,而不添加新的线程。”,所以A被添加到queue中。
又来了一个任务(B),且核心2个线程还没有忙完,OK,接下来首先尝试1中描述,但是由于使用的SynchronousQueue,所以一定无法加入进去。
此时便满足了上面提到的“如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。”,所以必然会新建一个线程来运行这个任务。
暂时还可以,但是如果这三个任务都还没完成,连续来了两个任务,第一个添加入queue中,后一个呢?queue中无法插入,而线程数达到了maximumPoolSize,所以只好执行异常策略了。
所以在使用SynchronousQueue通常要求maximumPoolSize是无界的,这样就可以避免上述情况发生(如果希望限制就直接使用有界队列)。对于使用SynchronousQueue的作用jdk中写的很清楚:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。
什么意思?如果你的任务A1,A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1么有被执行前,A2不可能添加入queue中。

new ThreadPoolExecutor(
    2, 4, 30, TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(2),
    new RecorderThreadFactory("CookieRecorderPool"),
    new ThreadPoolExecutor.CallerRunsPolicy());

假设,所有的任务都永远无法执行完。
对于首先来的A,B来说直接运行,接下来,如果来了C,D,他们会被放到queue中,如果接下来再来E,F,则增加线程运行E,F。但是如果再来任务,队列无法再接受了,线程数也到达最大的限制了,所以就会使用拒绝策略来处理。

3.4 keepAliveTime 参数

由上可知,ThreadPoolExecutor中额定的“工人”数量由corePoolSize决定,当任务数量超过额定工人数量时,将任务缓存在BlockingQueue之中,当发现如果连queue中也放不下时(可能是因为使用有界queue,也可能是使用SynchronousQueue),ThreadPoolExecutor会请求“老板”再派几个“工人”过来。

接下来发生的事情有两种情况:

对于keepAliveTime参数,jdk中的解释是:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

有点拗口,其实这个不难理解,在使用了“池”的应用中,大多都有类似的参数需要配置。比如数据库连接池,DBCP中的maxIdle,minIdle参数。

什么意思?接着上面的解释,后来向老板派来的工人始终是“借来的”,俗话说“有借就有还”,但这里的问题就是什么时候还了,如果借来的工人刚完成一个任务就还回去,后来发现任务还有,那岂不是又要去借?这一来一往,老板肯定头也大死了。

合理的策略:既然借了,那就多借一会儿。直到“某一段”时间后,发现再也用不到这些工人时,便可以还回去了。这里的某一段时间便是keepAliveTime的含义,TimeUnit为keepAliveTime值的度量。

总结:

keepAliveTime和maximumPoolSize及BlockingQueue的类型均有关系。如果BlockingQueue是无界的,那么永远不会触发maximumPoolSize,自然keepAliveTime也就没有了意义。

反之,如果核心数较小,有界BlockingQueue数值又较小,同时keepAliveTime又设的很小,如果任务频繁,那么系统就会频繁的申请回收线程。

3.5 RejectedExecutionHandler

即拒绝策略,当即使向老板借了工人,但是任务还是继续过来,还是忙不过来,这时整个队伍只好拒绝接受了。
RejectedExecutionHandler接口提供了对于拒绝任务的处理的自定方法的机会。在ThreadPoolExecutor中已经默认包含了4中策略,因为源码非常简单,这里直接贴出来。

3.5 .1 CallerRunsPolicy

线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

 public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。

3.5 .2 AbortPolicy

处理程序遭到拒绝将抛出运行时 RejectedExecutionException

public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

这种策略直接抛出异常,丢弃任务。

3.5 .3 DiscardPolicy

不能执行的任务将被删除

  public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。

3.5 .4 DiscardOldestPolicy

如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)

   public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。
设想:如果其他线程都还在运行,那么新来任务踢掉旧任务,缓存在queue中,再来一个任务又会踢掉queue中最老任务。

上一篇下一篇

猜你喜欢

热点阅读