ReadyJava开发那些事性能排查

如何优雅的使用线程池

2019-12-17  本文已影响0人  不学无数的程序员

线程池不仅在项目中是非常常用的一项技术而且在面试中基本上也是必问的知识点,接下来跟着我一起来巩固一下线程池的相关知识。在了解线程池之前我们先了解一下什么是进程什么是线程

进程

用户下达运行程序的命令以后,就会产生一个进程,同一个程序可以产生多个进程(一对多的关系),以允许同时有多个用户运行同一个程序,却不会相冲突。

进程需要一些资源才能工作,如CPU的使用时间、存储器、文件、以及I/O设备,且为依序逐一执行,也就是每个CPU核心任何时间内仅能运行一项进程。但是在一个应用程序中一般不会是只有一个任务单条线执行下去,肯定会有多个任务,而创建进程又是耗费时间和资源的,称之为重量级操作。

  1. 创建进程占用资源太多
  2. 进程之间的通信需要数据在不同的内存空间传来传去,所以进程间通信会更加耗费时间和资源

线程

线程是操作系统能够进行运算调度的最小单位,大部分情况下它被包含在进程之中,是进程中实际的运作单位。一个进程可以并发多个线程,每个线程执行不同的任务。同一个进程中的多条线程共享该进程中的全部虚拟资源,例如虚拟地址空间、文件描述符、信号处理等等。但是同一个进程中的多个线程各自有各自的调用栈。

一个进程可以有很多线程,每条线程并行执行不同的任务。

线程中的数据

  1. 线程栈上的本地数据:比如函数执行过程的局部变量,我们知道在Java中线程模型是使用栈的模型。每个线程都有自己的栈空间。
  2. 在整个进程里共享的全局数据:我们知道在Java程序中,Java就是一个进程,我们可以通过ps -ef | grep java可以看到在程序中运行了多少个Java进程,例如我们Java中的全局变量,在不同进程之间是隔离的,但是在线程之间是共享的。
  3. 线程的私有数据:在Java中我们可以通过ThreadLocal来创建线程间私有的数据变量。

线程栈上的本地数据只能在本方法内有效,而线程的私有数据是在线程间多个函数共享的。

CPU密集型和IO密集型

理解是服务器是CPU密集型还是IO密集型能够帮助我们更好的设置线程池中的参数。具体如何设置我们在后面讲到线程池的时候再分析,这里大家先知道这两个概念。

线程池

线程池其实是池化技术的应用一种,常见的池化技术还有很多,例如数据库的连接池、Java中的内存池、常量池等等。而为什么会有池化技术呢?程序的运行本质,就是通过使用系统资源(CPU、内存、网络、磁盘等等)来完成信息的处理,比如在JVM中创建一个对象实例需要消耗CPU的和内存资源,如果你的程序需要频繁创建大量的对象,并且这些对象的存活时间短就意味着需要进行频繁销毁,那么很有可能这段代码就成为了性能的瓶颈。总结下来其实就以下几点。

所以池化技术就是为了解决我们这些问题的,简单来说,线程池就是将用过的对象保存起来,等下一次需要这个对象的时候,直接从对象池中拿出来重复使用,避免频繁的创建和销毁。在Java中万物皆对象,那么线程也是一个对象,Java线程是对于操作系统线程的封装,创建Java线程也需要消耗操作系统的资源,因此就有了线程池。但是我们该如何创建呢?

Java提供的四种线程池

Java为我们提供了四种创建线程池的方法。

线程池的创建原理

我们点击去这四种实现方式的源码中我们可以看到其实它们的底层创建原理都是一样的,只不过是所传的参数不同组成的四个不同类型的线程池。都是使用了ThreadPoolExecutor来创建的。我们可以看一下ThreadPoolExecutor创建所传的参数。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

那么这些参数都具体代表什么意思呢?

接下来我们将这些参数合起来看一下他们的处理逻辑是什么。

  1. corePoolSize个任务时,来一个任务就创建一个线程
  2. 如果当前线程池的线程数大于了corePoolSize那么接下来再来的任务就会放入到我们上面设置的workQueue队列中
  3. 如果此时workQueue也满了,那么再来任务时,就会新建临时线程,那么此时如果我们设置了keepAliveTime或者设置了allowCoreThreadTimeOut,那么系统就会进行线程的活性检查,一旦超时便销毁线程
  4. 如果此时线程池中的当前线程大于了maximumPoolSize最大线程数,那么就会执行我们刚才设置的handler拒绝策略

为什么建议不用Java提供的线程池创建方法

理解了上面设置的几个参数以后,我们再来看一下为什么在《阿里巴巴Java手册》中有一条这样规定。

image

相信大家看到上面提供四种创建线程池的实现原理,应该知道为什么阿里巴巴会有这么规定了。

如何设置参数

所以我们在项目中如果要使用线程池的话,那么就推荐根据自己项目和机器的情况进行个性化创建线程池。那么这些参数如何设置呢?为了正确的定制线程池的长度,需要理解你的计算机配置、所需资源的情况以及任务的特性。比如部署的计算机安装了多少个CPU?多少的内存?任务主要执行是IO密集型还是CPU密集型?所执行任务是否需要数据库连接这样的稀缺资源?

如果你有多个不同类别的任务,它们的行为有很大的差别,那么应该考虑使用多个线程池。这样也能根据每个任务不同定制不同的线程池,也不至于因为一种类型的任务失败而托垮另一个任务。

页缺失(英语:Page fault,又名硬错误、硬中断、分页错误、寻页缺失、缺页中断、页故障等)指的是当软件试图访问已映射在虚拟地址空间中,但是当前并未被加载在物理内存中的一个分页时,由中央处理器的内存管理单元所发出的中断

其实线程池大小的设置还是要根据自己业务类型来设置,比如当前任务需要池化的资源的时候,比如数据库的连接池,俺么线程池的长度和资源池的长度会相互的影响。如果每一个任务都需要一个数据库连接,那么连接池的大小就会限制了线程池的有效大小,类似的,当线程池中的任务是连接池的唯一消费者时,那么线程池的大小反而又会限制了连接池的有效大小。

线程池中的线程销毁

线程池的核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、线程的存活时间(keepAliveTime)共同管理的线程的创建与销毁。接下来我们再复习一下线程池是如何创建和销毁线程的

那么这里可能有人会想到将corePoolSize核心线程数设置为0(如果大家还记得上面讲的CachedThreadPool的话应该还会记得它的核心线程数就是0),因为这样设置的话线程就会动态的进行创建了,闲的时候没有线程,忙的时候再在线程池中创建线程。这样想法固然是好,但是如果我们自定义参数设置了此参数为0,而正好又设置了等待队列不是SynchronousQueue,那么其实就会有问题,因为只有在队列满的情况下才会新建线程。下面代码我使用了无界队列LinkedBlockingQueue,其实大家看一下输出

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0,Integer.MAX_VALUE,1, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
for (int i = 0; i < 10; i++) {
    threadPoolExecutor.execute(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.printf("1");
        }
    });
}

大家可以看一下演示的效果,其实1是每隔一秒打印一次,其实这就和我们使用线程池初衷相悖了,因为我们这个相当于是单线程在运行。

image

但是如果我们将工作队列换成SynchronousQueue呢,我们发现这些1是一块输出出来的。

image

SynchronousQueue并不是一个真正的队列,而是一种管理直接在线程间移交信息的机制,这里可以简单将其想象成一个生产者生产消息交给SynchronousQueue,而消费者这边如果有线程来接收,那么此消息就会直接交给消费者,反之会阻塞。

所以我们在设置线程池中某些参数的时候应该想想其创建和销毁线程流程,不然我们自定义的线程池还不如使用Java提供的四种线程池了。

线程池中的拒绝策略

ThreadPoolExecutor为我们提供了四种拒绝策略,我们可以看下Java提供的四种线程池创建所提供的拒绝策略都是其定义的默认的拒绝策略。那么除了这个拒绝策略其他的拒绝策略都是什么呢?

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

我们可以到拒绝策略是一个接口RejectedExecutionHandler,这也就意味我着我们可以自己订自己的拒绝策略,我们先看一下Java提供四种拒绝策略是什么。

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

AbortPolicy

这个拒绝策略就是Java提供的四种线程池创建方法提供的默认拒绝策略。我们可以看下它的实现。

public static class AbortPolicy implements RejectedExecutionHandler {
 
    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

所以此拒绝策略就是抛RejectedExecutionException异常

CallerRunsPolicy

此拒绝策略简单来说就是将此任务交给调用者直接执行。

public static class CallerRunsPolicy implements RejectedExecutionHandler {

    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

这里为什么是交给了调用者来执行呢?我们可以看到它是调用了run()方法,而不是start()方法。

DiscardOldestPolicy

从源码中应该能看出来,此拒绝策略是丢弃队列中最老的任务,然后再执行。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

DiscardPolicy

从源码中应该能看出来,此拒绝策略是对于当前任务不做任何操作,简单来说就是直接丢弃了当前任务不执行。

public static class DiscardPolicy implements RejectedExecutionHandler {

    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

线程池的拒绝策略给我们默认提供了这四种的实现方式,当然我们也能够自定义拒绝策略使线程池更加符合我们当前的业务,在后面讲解Tomcat自定义自己的线程池时也会讲解它自己实现的拒绝策略。

线程饥饿死锁

线程池为“死锁”这一概念带来了一种新的可能:线程饥饿死锁。在线程池中,如果一个任务另一个任务提交到同一个Executor,那么通常会引发死锁。第二个线程停留在工作队列中等待第一个提交的任务执行完成,但是第一个任务又无法执行完成,因为它在等待第二个任务执行完成。用图表示如下

image

用代码表示的话如下,这里注意我们这里定义的线程池是SingleThreadExecutor,线程池中只有一个线程,这样好模拟出这样的情况,如果在更大的线程池中,如果所有线程都在等待其他仍处于工作队列的任务而阻塞,那么这种情况被称为线程饥饿死锁。所以尽量避免在同一个线程池中处理两种不同类型的任务。

public class AboutThread {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    public static void main(String[] args) {
        AboutThread aboutThread = new AboutThread();
        aboutThread.threadDeadLock();
    }

    public void threadDeadLock(){
        Future<String> taskOne  = executorService.submit(new TaskOne());
        try {
            System.out.printf(taskOne.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    public class TaskOne implements Callable{

        @Override
        public Object call() throws Exception {
            Future<String> taskTow = executorService.submit(new TaskTwo());
            return "TaskOne" + taskTow.get();
        }
    }

    public class TaskTwo implements Callable{

        @Override
        public Object call() throws Exception {
            return "TaskTwo";
        }
    }
}

拓展ThreadPoolExecutor

如果我们想要对线程池进行一些扩展,那么可以使用ThreadPoolExecutor给我预留的一些接口可以使我们进行更深层次话的定制线程池。

线程工厂

如果我们想要给我们的线程池中的每个线程自定义一些名称,那么我们就可以使用线程工厂来实现一些自定义化的一些操作。只要我们将我们自定义的工厂传给ThreadPoolExecutor,那么无论何时线程池需要创建一个线程,都要通过我们定义的工厂来进行创建。接下来我们看一下接口ThreadFactory,只要我们实现了此接口就能自定义自己线程独有的信息。

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

接下来我们可以看我们自己写的线程池工厂类

class CustomerThreadFactory implements ThreadFactory{

    private String name;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    CustomerThreadFactory(String name){
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r,name+threadNumber.getAndIncrement());
        return thread;
    }
}


只需要在进行线程池实例化的时候将此工厂类加上去即可

   public static void customerThread(){
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0,Integer.MAX_VALUE,1, TimeUnit.SECONDS,new SynchronousQueue<>(),
                new CustomerThreadFactory("customerThread"));

        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.printf(Thread.currentThread().getName());
                    System.out.printf("\n");
                }
            });
        }
    }

接下来我们执行此语句,发现每个线程的名字已经变了

customerThread1
customerThread10
customerThread9
customerThread8
customerThread7
customerThread6
customerThread5
customerThread4
customerThread3
customerThread2

通过继承ThreadPoolExecutor扩展

我们查看ThreadPoolExecutor源码可以发现源码中有三个方法都是protected

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

被protected修饰的成员对于本包和其子类可见

我们可以通过继承来覆写这些方法,那么就可以进行我们独有的扩展了。执行任务的线程会调用beforeExecuteafterExecute方法,可以通过它们添加日志、时序、监视器或者同级信息收集的功能。无论任务是正常从run中返回,还是抛出一个异常,afterExecute都会被调用(如果任务完成后抛出一个Error,则afterExecute不会被调用)。如果beforeExecute抛出一个RuntimeException,任务将不会被执行,afterExecute也不会被调用。

在线程池完成关闭时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后,terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者手机finalize统计等操作。

本篇文章代码地址

有感兴趣的可以关注一下我新建的公众号,搜索[程序猿的百宝袋]。或者直接扫下面的码也行。

image

参考

上一篇 下一篇

猜你喜欢

热点阅读