Java之线程池

2020-02-13  本文已影响0人  zhglance

1.Executors

Executors提供的常用线程池:
线程池的构造方法具体如下:


    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

参数说明:

a.corePoolSize: 线程池中常驻线程数;
b.maximumPoolSize: 线程池能够容纳的最大线程数,当等待队列满了,才增大存活线程,最大为maximumPoolSize;
**c.keepAliveTime: **多余空闲线程存活的时间,如果当线程池中线程数量超过corePoolSize时,当有空闲线程的时间超过keepAliveTime时间时,就会将多余的空闲线程销毁,只保留corePoolSize线程;
d.unit: keepAliveTime的单位;
e.workQueue: 任务队列,被提交但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
**f. threadFactory: **线程工厂,用于创建线程,一般用默认即可;
**g. handler: ** 拒绝策略,当任务太多来不及处理时,如何拒绝任务,主要有AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy,DiscardPolicy,NewThreadRunsPolicy等.

1.1 newFixedThreadPool线程池

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newFixedThreadPool指定常驻线程数,线程池中的线程数是固定的,但是由于使用了阻塞new LinkedBlockingQueue<Runnable>(),默认最大值长度为Integer.MAX_VALUE(即2 147 483 647),现实业务场景中几乎不会用到这么大的容量,因此如果LinkedBlockingQueue没有设置大小,可以将其视为无界队列。newFixedThreadPool使用无界队列,没有拒绝策略,如果线程等待太多会导致LinkedBlockingQueue队列长度十分长,会导致JVM的OMM。

1.2 newSingleThreadExecutor线程池

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newSingleThreadExecutor指定常驻线程数为1,线程池中的线程数是固定的1,,但是由于使用了new LinkedBlockingQueue<Runnable>(),默认最大值长度为Integer.MAX_VALUE(即2 147 483 647),现实业务场景中几乎不会用到这么大的容量,因此如果LinkedBlockingQueue没有设置大小,可以将其视为无界队列。newSingleThreadExecutor使用无界队列,没有拒绝策略,如果线程等待太多会导致LinkedBlockingQueue队列长度十分长,会导致JVM的OMM。

1.3 newCachedThreadPool线程池

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newCachedThreadPool可以动态的调整线程池线程数,核心线程数被设置为0,最大线程数设置为Integer.MAX_VALUE,使用有界阻塞队列SynchronousQueue。由于最大线程数几乎无上限,当主线程提交的速度高于线程池处理的速度的时候,newCachedThreadPool会不断的创建新的线程,极端情况下会导致CPU和内存资源耗尽。

1.4 newScheduledThreadPool周期性线程池

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

newScheduledThreadPool可以指定延迟时长和周期性执行,使用无界队列DelayQueue。

1.5 newWorkStealingPool(jdk8新增,ForkJoinPool的扩展)

    /**
     * Creates a work-stealing thread pool using all
     * {@link Runtime#availableProcessors available processors}
     * as its target parallelism level.
     * @return the newly created thread pool
     * @see #newWorkStealingPool(int)
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

具体原理待完成

1.6 阻塞队列:

阻塞队列:
a.阻塞添加 当队列满了的时候插入队列,会使线程阻塞,当队列不再满时,再唤醒线程执行插入队列;
b.阻塞删除 当队列为空的时候执行删除,删除线程将会被阻塞,直到队列不再为空的时候再唤起删除线程。

1.7 线程池的拒绝策略:

1.AbortPolicy 默认策略,即丢弃任务,抛出异常;

  1. DiscardPolicy 丢弃任务,不抛出异常;
  2. DiscardOldestPolicy 丢弃队列中等待最久的任务,并执行当前任务;
  3. CallerRunsPolicy 当队列满时,新线程提交的直接在主线程中执行(不在线程池中执行),相当于submit被阻塞,后续提交的任务被阻塞,只有主线程执行提交的线程之后,后续的线程才会被提交到线程池中。

1.8 线程池submit()和execute()的区别

execute()方法无返回值
submit() 可以有返回值,如使用Future接收返回值。

1.9 线程池可以catch到线程的Exception吗

答案:不能

1.10 线程池中线程的回收

1.当从等待队列中获取不到线程的时候(即返回null),就会触发线程尝试回收,当线程数大于核心线程数,且线程空闲时间超过设置的空闲时间阈值,那么就会使用CAS乐观锁触发销毁线程操作,避免并发操作导致保留的线程数小于核心线程数。
2.当线程池执行了shutdown时,拒绝接受新提交的线程,判断线程池中的线程是否处于空闲状态,如果处于空闲状态则使用CAS回收,否则等待执行完毕再回收。
3.当线程池执行了shutdownNow时,拒绝接受新提交的线程,同时立马关闭线程池(不使用CAS),线程池里的任务不再执行。

线程池个数的选择:

1.计算密集型任务

线程数 = CPU个数 + 1

加1的原因是,不会因为某个线程异常或者暂停而导致CPU在执行周期内轮空;

2.对于IO型或者是阻塞类型

线程数 = CPU个数 * 2
或者
线程数 = CPU个数 * CPU利用率 (0~1之间)* (1 + (等待时间 / 计算时间))

对于这种情况比较麻烦,需要考虑的因素很多,如线程要链接数据库时,那么数据库连接池可用的连接数限制了线程的个数大小。

上一篇下一篇

猜你喜欢

热点阅读