线程池一些收藏收藏

线程池

2022-03-04  本文已影响0人  程序员札记

在一个应用程序中,我们需要多次使用线程,也就意味着,我们需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。而在Java中,内存资源是及其宝贵的,所以,我们就提出了线程池的概念。

线程池:Java中开辟出了一种管理线程的概念,这个概念叫做线程池,从概念以及应用场景中,我们可以看出,线程池的好处,就是可以方便的管理线程,也可以减少内存的消耗。

那么,我们应该如何创建一个线程池那?Java中已经提供了创建线程池的一个类:Executor

而我们创建时,一般使用它的子类:ThreadPoolExecutor

 public ThreadPoolExecutor(int paramInt1, int paramInt2, long paramLong, TimeUnit paramTimeUnit,
            BlockingQueue<Runnable> paramBlockingQueue, ThreadFactory paramThreadFactory,
            RejectedExecutionHandler paramRejectedExecutionHandler) {
        this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
        this.mainLock = new ReentrantLock();
        this.workers = new HashSet();
        this.termination = this.mainLock.newCondition();
        if ((paramInt1 < 0) || (paramInt2 <= 0) || (paramInt2 < paramInt1) || (paramLong < 0L))
            throw new IllegalArgumentException();
        if ((paramBlockingQueue == null) || (paramThreadFactory == null) || (paramRejectedExecutionHandler == null))
            throw new NullPointerException();
        this.corePoolSize = paramInt1;
        this.maximumPoolSize = paramInt2;
        this.workQueue = paramBlockingQueue;
        this.keepAliveTime = paramTimeUnit.toNanos(paramLong);
        this.threadFactory = paramThreadFactory;
        this.handler = paramRejectedExecutionHandler;
    }

下面一张图来更好的理解线程池和这几个参数:

image.png

(1)如果当前运行的线程少于corePoolSize,则会创建新的线程来执行新的任务,即使线程池中的其他线程是空闲的;
(2)如果运行的线程个数等于或者大于corePoolSize且小于maximumPoolSize,则会将提交的任务存放到阻塞队列workQueue中;
(3)如果当前workQueue队列已满的话,则会创建新的线程来执行任务;
(4)如果线程个数已经超过了maximumPoolSize,则会使用饱和策略RejectedExecutionHandler来进行处理增量的任务

ThreadPoolExecutor

image.png

Executor 框架结构(主要由三大部分组成)

执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。

Executor包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。

注意: 通过查看 ScheduledThreadPoolExecutor 源代码我们发现 ScheduledThreadPoolExecutor 实际上是继承了 ThreadPoolExecutor 并实现了 ScheduledExecutorService ,而 ScheduledExecutorService 又实现了 ExecutorService,正如我们下面给出的类关系图显示的一样。ThreadPoolExecutor 类描述:

//AbstractExecutorService实现了ExecutorService接口

public class ThreadPoolExecutor extends AbstractExecutorService

ScheduledThreadPoolExecutor 类描述:

//ScheduledExecutorService实现了ExecutorService接口

public class ScheduledThreadPoolExecutor

        extends ThreadPoolExecutor

        implements ScheduledExecutorService

问题:ScheduledThreadPoolExecutor 是什么设计模式

  1. 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
  2. 把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable <T> task))。
  3. 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。
  4. 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。


    image.png

线程池的5种状态:Running、ShutDown、Stop、Tidying、Terminated。
分析。以下是ThreadPoolExecutor状态控制的主要变量和方法:

    //原子状态控制数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //29比特位
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //实际容量 2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // runState存储在高位中
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    // Packing and unpacking ctl 打包和解压ctl

    // 解压runState
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 解压workerCount
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 打包ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池使用一个AtomicInteger的ctl变量将 workerCount(工作线程数量)和 runState(运行状态)两个字段压缩在一起 ,这种做法在在java源码里经常有出现,如在 ReentrantReadWriteLock 里就将一个int分成高16位和底16位,分别表示读锁状态和写锁状态。ThreadPoolExecutor里也是使用了同样的思想,表现得更加复杂。

ThreadPoolExecutor用3个比特位表示runState, 29个比特位表示workerCount。因此这里需要特别说明的是:确切的说,当最大线程数量配置为Integer.MXA_VAULE时,ThreadPoolExecutor的线程最大数量依然是2^29-1。
目前来看这是完全够用的,但随着计算机的不断发展,真的到了不够用的时候可以改变为AtomicLong。这如同32位系统时间戳会在2038年01月19日03时14分07秒耗尽一样,当以后我们的系统线程能够超过2^29-1时,这些代码就需要调整了。对于未来,无限可能。

思考一下为什么是29:3呢?
这是因为我们的运营状态有5种,向上取2次方数,2^3 = 8。所以必须要3个比特位来表示各种状态。

运行状态解释:

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。

   /**
     * 用给定的初始参数创建一个新的ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                              int maximumPoolSize,//线程池的最大线程数
                              long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                              TimeUnit unit,//时间单位
                              BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
                              ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                              RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                               ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

下面这些对创建 非常重要,在后面使用线程池的过程中你一定会用到。
ThreadPoolExecutor 3 个最重要的参数:

ThreadPoolExecutor其他常见参数:

ThreadPoolExecutor 饱和策略定义:

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,ThreadPoolTaskExecutor 定义一些策略:

ThreadPoolExecutor 提交任务

public void execute(Runnable command)
public <T> Future<T> submit(Callable<T> task)
public Future<?> submit(Runnable task) 
//执行集合中所有任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
//提交所有任务,返回第一个执行完的task的执行结果,其他的task结束。
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
上一篇下一篇

猜你喜欢

热点阅读