Java-多线程程序员

Java再回顾(1)—线程池回顾与思考

2020-06-10  本文已影响0人  史小豪

概述

本文是针对java线程池的回顾与思考,主要是围绕java线程池的思路梳理与知识总结。长文预警(写这篇真的累到爆炸)!!!

概念

什么是线程?

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

相信大家对线程的概念都不会陌生。
实际上,在日常的开发过程中,也会遇到大量的使用多线程的场景,如异步计算、android新开线程进行耗时操作等。

如何实现线程?

通常,实现线程的主要方式有以下两种(当然还有实现callable等等...):
1.1继承Thread类(java.lang.Thread),实现run方法:

public class ThreadDemo1 extends Thread {
    public  void run(){
        //do something
    }
}

1.2实现Runnable接口(java.lang.Runnable),实现run方法:

public class ThreadDemo2 implements Runnable {
    public  void run(){
        //do something
    }
}

1.3两种启动方式的对比
-Thread占据了父类的名额,没有Runnable方便。因为java是单继承。
-Thread类本身也是实现了Runnable
如下图所示:

image.png
-Runnable启动需要Thread类的支持
如下图所示:
image.png
-Runnable更容易实现多线程中的资源共享
结论:更建议实现Runnable接口来创建线程

线程的启动?
通常都是new 、start二连...
最原始的做法是有一个任务就new一个线程..
然而,线程的创建是具有一定开销的,频繁地new线程可能会引起一系列的问题:占用过多资源导致死机、线程间没有互动无法完成协作、对线程缺乏管理.....
所以,java为我们引入了线程池——ThreadPoolExecutor,来统一的管理线程,方便线程的复用。

正文

ThreadPoolExecuto的体系

可能一百度线程池,会看到什么java四种线程池(FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool),实际它们都是基于ThreadPoolExecutor,只不过参数不同。
我们先看一下ThreadPoolExecuto的体系:


ThreadPoolExecutor体系.png

先看一下Executor接口,如图所示:


image.png

它只包含了一个execute方法,实际任务的提交、线程池的关闭等由ExecutorService决定。而继承了ExecutorService的抽象类AbstractExecutorService则实现了一部分接口....这里就不赘述了,直接进入整体。

ThreadPoolExecuto的构造函数
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }


    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

这好像多少有些吓人...
但是别怕,咱们慢慢来。
首先,细心的同学可能已经发现,前三个构造器实际都是调用了最后一个构造器,只不过参数有所差异。
所以,我们先结合系统的javaDoc从共有的参数来讲起:

corePoolSize

     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set

核心线程:在创建完线程池后,核心线程先不创建,接到任务后创建核心线程。核心线程即使空闲也依旧会保留在线程池中,除非设置了allowCoreThreadTimeOut。当allowCoreThreadTimeOut设置之后,那么核心线程超时后就会销毁。

maximumPoolSize

     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool

线程池最大数量:这个很好理解,根据字面意思就能明白。线程池最大数量=核心线程数量+非核心线程数。

keepAliveTime

     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.

非核心线程的超时时长:就是字面意思,如果非核心线程执行完任务空闲了,等待任务到来的时长超出这个则会回收

unit

* @param unit the time unit for the {@code keepAliveTime} argument

这里的unit是指keepAliveTime的计量单位,使用TimeUnit。
TimeUnit是一个枚举类型,包括微毫秒、微秒一直到天。

workQueue

 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.

workQueue:任务阻塞队列,默认情况下,任务添加进来会先交给核心线程执行,如果没有核心线程空闲,则加入到任务队列中等待执行,任务队列可以设置一个最大值,当达到最大值后则创建非核心线程执行任务。
常见的workQueue有五种:
1.SynchronousQueue:同步阻塞队列,不存储元素。一接收到任务,就提交给线程执行,如果无空闲线程,则会创建新的线程。SynchronousQueue的插入操作是阻塞的,每个插入操作必须要等到另一个线程调用移除操作。

image.png
如图所示,前面提到的四大线程池之一的CachedThreadPool,就使用了这个队列。由于一有新的任务,就要复用线程/新创建线程,所以如果maximumPoolSize设置太小,就会抛出异常。
下面手打一段非常简单的代码测试一下:
public class ThreadPoolTest {
    private static ExecutorService pool;

    public static void main(String[] args) throws InterruptedException {
        pool = new ThreadPoolExecutor(0, 2,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTest());
        }
    }
}
     class ThreadTest implements Runnable{

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"跑跑跑");
        }
    }

异常抛出:


RejectedExecutionException.png

所以对于设置SynchronousQueue的pool,要注意maximumPoolSize的设置。如同上面的CachedThreadPool设置为maxsize,实际虽然不会抛出策略异常,但是可能会导致oom。

2.LinkedBlockingQueue
这个队列默认情况下是无界的,也就是说在未设置队列容量的情况下,队列容量是最大值,如下图所示。

image.png

一有任务到来,如果没有空闲/可创建的核心线程,任务就会被加入到这个队列之中去, 从而使得maximumPoolSize失去作用。
前面提到的四大线程池之一的FixedThreadPool就采用了这个队列。


image.png

3.DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。

4.PriorityBlockingQueue:具有优先级的无界阻塞队列

5.ArrayBlockingQueue:用数组实现的有界阻塞队列。如果任务来临,核心线程数满了,队列满了,最大线程数也满了,就会出错或者执行饱和策略。

回归正题,介绍新的构造器参数。
ThreadFactory:创建线程的工厂,事实上大部分时候都不用管这个参数,这个参数是用来给线程配置信息的。

RejectedExecutionHandler:饱和(拒绝)策略
该策略是当线程数已满且都在工作中、队列也都满了的时候,所要采用的应对策略,它是线程池的一种保护机制。
拒绝策略共有下面四种:

  1. AbortPolicy:默认策略,表示无法处理新任务,并抛出RejectedExecutionException 异常 。
    具体实例就如本文开头测试的RejectedExecutionException图所示。
  2. CallerRunsPolicy:由调用者所在的线程进行处理,这种策略会提供简单的反馈控制机制。
  3. DiscardPolicy:丢弃任务,但不抛出异常。使用这种策略,可能会使得无法发现系统的异常状态。
  4. DiscardOldestPolicy:将队列头部的任务抛弃,然后重新提交新任务。
四大线程池

好了,终于进入喜闻乐见的四大线程池部分了。前面已经说过了,四大线程池是(FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool)。
Java将创建这四种线程池的方法都放在了Executors这个工厂类之中(当然,能不用Executors是最好哈,阿里巴巴的开发规范就有一条明确说明不准使用Executors创建线程池,要使用ThreadPoolExecutor)。
当然了,我们之前也说过,实际上他们都是由ThreadPoolExecutor构造而成的。下面我们看一下Executors相关线程池的创造方法,你就会发现使用ThreadPoolExecutor会更有助于理解线程池而且很香哦。
下面我们逐一看一下:
1.CachedThreadPool
这是一个缓冲线程池,有任务就让空闲线程/创建新线程运行,空闲线程的超时时间为60秒。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

2.FixedThreadPool:
创建一个固定数目的可重用的线程池。这个采用了 LinkedBlockingQueue也就是上面我们说到的无界阻塞队列。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

3.ScheduledThreadPoolExecutor
定时线程池,没啥好讲的,看了前面的内容,相信聪明的你一下就明白辣。

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

4.SingleThreadExecutor
一个核心线程,先进先出。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

总结

实际上,推荐ThreadPoolExecutor的使用来替代Executors,会更便于理解线程池,也能规避一些错误。要注意的就是线程池最大数量、饱和策略、阻塞队列的选择。

后续

天,写一篇关于线程池的文章的想法已经有一段时间了,一直懒于提笔,今天终于把债还上了.....


image.png
上一篇下一篇

猜你喜欢

热点阅读