理论Java多线程

线程池

2020-03-04  本文已影响0人  编程的猫

什么是线程池?

使用线程池的好处:

ThreadPoolExecutor

/* 
*@ ThreadPoolExecutor构造参数介绍 
*@author SEU_Calvin 
* @date 2016/09/03 
*/  
public ThreadPoolExecutor(  
//核心线程数,除非allowCoreThreadTimeOut被设置为true,否则它闲着也不会死  
int corePoolSize,   
//最大线程数,活动线程数量超过它,后续任务就会排队                     
int maximumPoolSize,   
//超时时长,作用于非核心线程(allowCoreThreadTimeOut被设置为true时也会同时作用于核心线程),闲置超时便被回收             
long keepAliveTime,                            
//枚举类型,设置keepAliveTime的单位,有TimeUnit.MILLISECONDS(ms)、TimeUnit. SECONDS(s)等  
TimeUnit unit,  
//缓冲任务队列,线程池的execute方法会将Runnable对象存储起来  
BlockingQueue<Runnable> workQueue,  
//线程工厂接口,只有一个new Thread(Runnable r)方法,可为线程池创建新线程  
ThreadFactory threadFactory)

ThreadPoolExecutor的各个参数所代表的特性注释中已经写的很清楚了,那么ThreadPoolExecutor执行任务时的心路历程是什么样的呢?(以下用currentSize表示线程池中当前线程数量)

RejectedExecutionHandler:饱和策略。

饱和策略有四种:

四种常用的线程池

首先来看看这四种线程池的适用场景:

通过直接或间接的配置ThreadPoolExecutor的参数可以创建不同类型的ThreadPoolExecutor

FixedThreadPool是可重用的固定线程数的线程池,在Executors类中提供了创建FixedThreadPool的方法:
public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads,nThreads,
                                                        0L,TimeUnit.MILLISECONDS,
                                                        new LinkedBlockingQueue<Runnable>());
}

由图分析可知:
当执行execute方法时,如果当前运行的线程未达到corePoolSize(核心线程数)时就创建核心线程来处理任务,如果达到了核心线程数则将任务添加到LinkedBlockingQueue中。FixedThreadPool就是一个有固定数量核心线程的线程池,并且这些核心线程不会被回收。当线程超过corePoolSize时,就将任务存储在任务队列中。当线程池有空闲线程时,则从任务队列中去取任务执行。

CachedThreadPool是一个根据需要创建线程的线程池。线程数量原则上不设上限

在Executors类中创建CachedThreadPool的方法如下:

public static ExecutorService newCachedThreadPool(){
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
                                                        60L,TimeUnit.SECONDS,
                                                        new SynchronousQueue<Runnable>());
}

CachedThreadPool的corePoolSize为0,maximumPoolSize设置为Integer.MAX_VALUE,意味着CachedThreadPool没有核心线程,非核心线程是无界的。KeepAliveTime设置为60L,则空闲线程等待新任务的最长时间为60s。在此用了阻塞队列SynchronousQueue,它是一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。


CachedThreadPool线程池执行流程图

由图分析可知,当执行execute方法时,首先会执行SynchronousQueue的offer方法来提交任务,并查询线程池中是否有空闲的线程执行SynchronousQueue的poll方法来移除任务。如果有则配对成功,将任务交给这个空闲的线程处理。如果没有则配对失败,创建新的线程去处理任务。当线程池中的线程空闲时,它会执行SynchronousQueue的poll方法,等待SynchronousQueue中新提交的任务。如果超过了60s没有新任务提交到SynchronousQueue,则这个空闲线程将终止。因为maximumPoolSize是无界的,所以如果提交的任务大于线程池中线程处理任务的速度就会不断的创建新线程。另外,每次提交任务都会立即有线程去处理。所以,CachedThreadPool比较适于大量的需要立即处理并且耗时较少的任务。

SingleThreadExecutor是使用单个工作线程的线程池,其创建代码如下
public static ExecutorService newSingleThreadExecutor{
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1,1,
                                                                                    0L,TimeUnit.MILLISECONDS,
                                                                                    new LinkedBlockingQueue<Runnable>()));
}

corePoolSize和maximumPoolSize都为1,意味着SingleThreadExecutor只有一个核心线程,其他的参数都和FixedThreadPool一样。


SingleThreadExecutor执行的流程示意图

由图分析,可知:
当执行execute方法时,如果当前运行的线程数未达到核心线程数,也就是当前没有运行的线程,则创建一个新线程来处理任务。如果当前有运行的线程,则将任务添加到阻塞队列LinkedBlockingQueue中。因此,SingleThreadExecutor能确保所有的任务在一个线程中按照顺序逐一执行。

ScheduledThreadPool是一个能实现定时和周期性任务的线程池。创建代码如下:
public staic ScheduledExecutorService newScheduledThreadPool(int corePoolSize){
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

这里创建了ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,它主要用于给定延时以后运行的任务或者定期处理任务。ScheduledThreadPoolExecutor的构造方法如下:

public ScheduledThreadPoolExecutor(int corePoolSize){
    super(corePoolSize,Integer.MAX_VALUE,
                DEFAULT_KEEPALIVE_MILLIS,MILLISECONDS,
                new DelayedWorkQueue());
}

ScheduledThreadPoolExecutor的构造方法最终调用的是ThreadPoolExecutor的构造方法。corePoolSize是传进来的固定数值,maximumPoolSize的值是Integer.MAX_VALUE。因为采用的DelayedWorkQueue是无界的,所以maximumPoolSize这个参数是无效的。


ScheduledThreadPoolExecutor的execute方法的执行示意图

由图分析,可知:
当执行ScheduledThreadPoolExecutor的scheduleAtFixedRate或者scheduleWithFixDelay方法时,会向DelayedWorkQueue添加一个实现RunnableScheduledFuture接口的ScheduledFutureTask(任务的包装类),并会检查运行的线程是否达到corePoolSize。如果没有则新建线程并启动它,但不是立即去执行任务,而是去DelayedWorkQueue中取出ScheduledFutureTask,然后去执行任务。如果运行的线程达到了corePoolSize时,则将任务添加到DelayedWorkQueue中。DelayedWorkQueue会将任务进行排序,先要执行的任务放在队列的前面。其和上面介绍的几个线程池不同的是,当执行完任务后,会将ScheduledFutureTask的time变量改为下次要执行的时间并放回到DelayedWorkQueue中。

线程池为什么要用(阻塞)队列?

线程池为什么要使用阻塞队列而不使用非阻塞队列?

阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。使得在线程不至于一直占用cpu资源。

线程执行完任务后通过循环再次从任务队列中取出任务进行执行,代码片段如下

while (task != null || (task = getTask()) != null) {})。

不用阻塞队列也是可以的,不过实现起来比较麻烦而已

如何选型线程池

execute()和submit()方法

创建线程池示例,这里没有添加自定义的饱和策略

public class TgExecutor {

    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    private static final int KEEP_ALIVE = 1;

    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "TgThread #" + mCount.getAndIncrement());
        }
    };
    //阻塞队列   capacity阻塞队列的容量
    public static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);
    
    /**
     * An {@link Executor} that can be used to execute tasks in parallel.
     */
    private static final Executor THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
            TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

    public static Executor getExecutor() {
        return THREAD_POOL_EXECUTOR;
    }

}

参考相关博客:
https://www.cnblogs.com/1925yiyi/p/9040605.html
https://blog.csdn.net/qq_39969226/article/details/88141264

上一篇下一篇

猜你喜欢

热点阅读