一步步学Java

Java线程池

2017-11-27  本文已影响10人  齐晋

线程池的作用

暂且不表

线程池

java提供的线程池类是ThreadPoolExecutor
下图是类ThreadPoolExecutor的继承关系

ThreadPoolExcecutor继承关系

使用线程池,我们一般关心以下几个场景:

鉴于以上场景,ThreadPoolExecutor有4个构造函数,能够满足所有的需求:

public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

我们需要先看看构造函数的参数都是什么意思:

int corePoolSize: 线程池中alive线程的最少数量。

默认情况下,创建线程池后,线程池中是没有任何线程的。线程的创建延迟到了请求到达的时候。也就是说,一开始线程池中线程数量是0,来一个请求就创建一个线程。当线程数量达到corePoolSize时,线程池中线程的数量最少为corePoolSize

int maximumPoolSize: 线程池中最大线程数量。

在maximumPoolSize>corePoolSize的情况下,线程池中线程数量可超过corePoolSize,以应对过大的压力。当压力降下来的时候,一部分线程过一段时间会自动销毁,直至数量减少到corePoolSize。如果maximumPoolSize=corePoolSize,那么线程池的大小就是固定的了。
Q: 线程数什么时候才会超过corePoolSize呢?
A: 多余的请求会先进入等待队列,当等待队列满了的时候,会创建新的线程来处理请求(前提是maximumPoolSize>corePoolSize),线程数最大为maximumPoolSize。当还有更多请求时,就要采取抛弃策略了。
Q: 那岂不是新请求会被先执行?

long keepAliveTimeTimeUnit unit: 线程存活时间

maximumPoolSize中提到线程数量可以超过corePoolSize。这些额外的线程过多场时间销毁呢?就是由keepAliveTime和unit决定的。keepAliveTime是个数字,unit表示时间。TimeUnit的可选值有:

  • TimeUnit.DAYS; //天
  • TimeUnit.HOURS; //小时
  • TimeUnit.MINUTES; //分钟
  • TimeUnit.SECONDS; //秒
  • TimeUnit.MILLISECONDS; //毫秒
  • TimeUnit.MICROSECONDS; //微妙
  • TimeUnit.NANOSECONDS; //纳秒

如keepAliveTime=1,unit为TimeUnit.MINUTES,表示多余的线程过1分钟后销毁。

BlockingQueue<Runnable> workQueue:等待队列,存放等待的请求。当线程数达到corePoolSize时,再来的请求会放入等待队列

  • ArrayBlockingQueue: 基于数组的先进先出队列,此队列创建时必须指定大小;
  • LinkedBlockingQueue: 基于链表的先进先出队列,可以指定大小。如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
  • SynchronousQueue: 这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

RejectedExecutionHandler handler: 线程池中线程不够用,队列也放不下了,采取什么样的策略处理新请求。

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃线程队列里最近的一个任务,执行新提交的任务
  • ThreadPoolExecutor.CallerRunsPolicy:用调用者的线程来运行任务

根据实际需求,设置好以上这些参数,就能创建出一个可用的线程池。

除了上面的两个构造函数,ThreadPoolExecutor还提供了支持ThreadFactory创建线程的构造函数。如下:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
        
     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
         BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
}

ThreadFactory的作用......

有了ThreadPoolExecutor,只需设置一些参数就可以拥有一个线程池,是不是很简单!是!但是还可以更简单。

程序员一直都有懒的天性。因为懒,所以才创造出了各种工具、各种语言。秉承着懒懒更健康的原则,对于常见线程池,JDK提供了创建工具Executors

Executors

Executors是一个创建线程池的工具类。它提供了最常见的线程池的创建方法。不需要我们再苦思冥想设置参数了,只需选择合适的函数,一个最通用的线程池就创建了。

Executors主要提供了以下四种工具:

  • newFixedThreadPool(int nThreads): 创建线程数量固定的线程池。
  • newSingleThreadExecutor(): 创建只有一个线程的线程池。
  • newCachedThreadPool(): 创建不限制线程数量的线程池。
  • newScheduledThreadPool(int corePoolSize): 创建一个定时调度的线程池。

源码如下:

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newSingleThreadExecutor() {
     return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){
    new ScheduledThreadPoolExecutor(corePoolSize);
    }
}

通过源码,我们可以知道:

抛弃策略是啥?
有人可能发现工具类中没有提供抛弃策略的参数。其实是使用了默认的抛弃策略:

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

Executors中还有其他创建线程池的方法,可自行查阅,选取适合自己需求的使用。

线程池的常用操作

Future<?> submit(Runnable task);

向线程池中提交一个Runnable类,返回一个Future对象。当调用get()方法时,会阻塞。线程结束时,返回null。

<T> Future<T> submit(Runnable task, T result);

向线程池中提交一个Runnable类,返回一个Future对象。当调用get()方法时,会阻塞。线程结束时,返回传入的result。

<T> Future<T> submit(Callable<T> task);

向线程池中提交一个Callable类,返回一个Future对象。当调用get()方法时,会阻塞。线程结束时,返回执行结果。

void shutdown();

关闭线程。已经submit的会继续执行直至结束,不会再接收新的任务

List<Runnable> shutdownNow();

如果shutdown()时,有线程在一直执行,不结束,总不能一直等吧。shutdownNow()会结束所有的线程,返回结果是所有等待任务。

Executor & ExecutorService & Executors

public interface Executor {
    void execute(Runnable command);
}
public interface ExecutorService extends Executor {
    void shutdown();
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
}
public interface ExecutorService extends Executor {
    void shutdown();
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
}

应用实例

理论再多,不如看实际应用中的源码:
metrics-core定时report源码:

public abstract class ScheduledReporter{
    private final ScheduledExecutorService executor;
    protected ScheduledReporter(MetricRegistry registry,
                                String name,
                                MetricFilter filter,
                                TimeUnit rateUnit,
                                TimeUnit durationUnit) {
        this(registry, name, filter, rateUnit, durationUnit,
                Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(name + '-' + FACTORY_ID.incrementAndGet())));
    }
public void start(long period, TimeUnit unit) {
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    report();
                } catch (RuntimeException ex) {
                    LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this.getClass().getSimpleName(), ex);
                }
            }
        }, period, period, unit);
    }
    
    public void stop() {
        executor.shutdown(); // Disable new tasks from being submitted
        try {
            // Wait a while for existing tasks to terminate
            if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // Cancel currently executing tasks
                // Wait a while for tasks to respond to being cancelled
                if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                    System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted
            executor.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }
}

参考

上一篇下一篇

猜你喜欢

热点阅读