Java基础 -- 线程池

2019-09-28  本文已影响0人  tom_xin

为什么需要线程池

    在生产环境中,为每个任务都分配一个线程,这种方法存在一些缺陷,尤其是当需要创建大量的线程时:
    线程生命周期开销非常高:线程的创建与销毁都会需要JVM和操作系统参与提供一些辅助操作。在高并发的场景下,是非常消耗CPU计算资源的。
    资源消耗:活跃的线程是会占用系统内存资源的,大量的线程对系统内存的消耗非常严重。
    稳定性:线程数量在一定范围内是会提高系统资源利用率的,如果超出了这个限制,很可能会发生一些意向不到的问题,例如OOM。
    线程池的出现很好的解决了这些问题,它将提交任务执行任务分开(生产者-消费者模式),通过配置参数限制了线程池中最大可允许创建线程的数量。


线程池的基本概念

Integer corePoolSize: 线程池的最大空闲线程数,但变量allowCoreThreadTimeOut同样可以设置在一定时间内回收这些线程。
Integer maximumPoolSize:线程池最大的活跃线程数。
long keepAliveTime:当线程池内的线程数大于corePoolSize指定的线程数时,可以通过keepAliveTime来指定
TimeUnit unit:keepAliveTime的时间单位(时,分,秒,毫秒)
BlockingQueue<Runnable> workQueue:该队列用于暂存因为空闲线程导致没有执行的任务。
ThreadFactory threadFactory:线程池中的线程是通过ThreadFactory创建的。
RejectExecutionHandler handler:当BlockingQueue(阻塞队列)的暂存的任务数达到BlockingQueue容量的上限,并且此时暂无可用线程时,会执行该拒绝策略。
Worker类:提交到线程池中的任务都会被封装为一个Worker对象,并分配一个线程。在执行任务时会调用Worker类的run()方法。Worker类实现了Runnable的run方法,继承了AbstractQueueSynchronizer类,重写了tryAcquire,tryRelease方法,控制线程在运行过程中的interrupt操作,提供了同步机制。


线程池的执行流程

image.png

常见线程池的使用方式

Executor框架

    FixThreadPool
    Executor框架提供了两个创建FixThreadPool线程池的方法,newFixThreadPool(int nThreads),newFixThreadPool(int nThreads,ThreadFactory threadFactory)。FixThreadPool限制了线程池中线程的数量。
    SingleThreadPool
    只有一个线程的线程池,提交到线程池中的任务会按照他们的提交顺序来执行。
    CachedThreadPool
    在程序执行过程中会创建与所需数量相同的线程,然后再它回收旧线程时停止创建新线程。
    ScheduledThreadPoolExecutor
    通过schedule(Runnable command, long delay, TimeUnit unit)方法,向ScheduledThreadPoolExecutor中提交任务时,可以设置延迟时间。设置延迟时间的任务会在一段时间后执行。

使用线程池时需要注意哪些事项

  1. 合理设置核心线程数大小
        在设置线程池中线程的数量时,只要数量不是“过大”或者“过小”都可以,在设置完成后,需要充分的自测。在《Java并发编程实战》中给出了核心线程数的计算公式。
    线程池的最优大小 = CPU核数 * CPU的期望利用率 * (1 + 线程等待时间/线程执行时间)

    image.png
  2. 不同职责的线程池区分
         项目中使用线程池时需要根据自己的应用场景来选择合适的线程池。不过在阿里编写的《阿里巴巴Java开发手册》中建议尽量使用ThreadPoolExecutor来构造适合自己的线程池。

  3. 选择适当的拒绝策略
         我在另外一篇文章中有介绍 https://www.jianshu.com/p/5f70fbd70b84

  4. 避免使用ThreadLocal
        线程池中线程的生命周期通常会超过任务的生命周期,所以ThreadLocal中以Thread ID作为Key来存储缓存可能会出现问题。


日常工作中的使用

需求:统计线程池中的任务一段时间内请求外部接口的次数
实现:可以利用ThreadPoolExecutor中的beforeExecute,afterExecute,terminated方法来实现这个要求。
具体代码如下。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class StatisticThreadFactoryPool extends ThreadPoolExecutor {

    private final static ThreadLocal<Long> threadLocal = new ThreadLocal<>();

    private final static List<StatisticDO> statisticDOS = new ArrayList<>(16);


    public StatisticThreadFactoryPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public StatisticThreadFactoryPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public StatisticThreadFactoryPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public StatisticThreadFactoryPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * 在任务执行前,记录当前而任务执行的开始时间
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        // 执行父类的方法
        super.beforeExecute(t, r);
        // 存储当前的时间
        threadLocal.set(System.nanoTime());
    }

    /**
     *  在任务结束后,将统计信息保存到StatisticDO对象中。
     *  记录任务的开始时间
     *  记录任务的结束时间
     *  计算执行时间
     *  保存当前任务的标识
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try {
            StatisticDO statisticDO = new StatisticDO();
            statisticDO.setStartTime(threadLocal.get());
            statisticDO.setEndTime(System.nanoTime());
            statisticDO.setRunTime(statisticDO.getEndTime() - statisticDO.getStartTime());
            statisticDO.setTask("thread:" + Thread.currentThread() + r.toString());
            statisticDOS.add(statisticDO);
        } finally {
            super.afterExecute(r, t);
        }
    }

    /**
     *  在线程池的任务结束后,将统计信息放入redis缓存中,方便后续调用查询。
     *  
     */
    @Override
    protected void terminated() {
        try {
            // 将statisticDOS中的数据写入缓存中:redis。方便后面查询统计使用
            threadLocal.remove();
        } finally {
            super.terminated();
        }
    }

    class StatisticDO {
        // 任务运行时间
        private long runTime;
        // 任务开始时间
        private long startTime;
        // 任务结束时间
        private long endTime;
        // 任务标识
        private String task;


        public long getRunTime() {
            return runTime;
        }

        public void setRunTime(long runTime) {
            this.runTime = runTime;
        }

        public long getStartTime() {
            return startTime;
        }

        public void setStartTime(long startTime) {
            this.startTime = startTime;
        }

        public long getEndTime() {
            return endTime;
        }

        public void setEndTime(long endTime) {
            this.endTime = endTime;
        }

        public String getTask() {
            return task;
        }

        public void setTask(String task) {
            this.task = task;
        }
    }
}

上一篇下一篇

猜你喜欢

热点阅读