ThreadPoolBuilder构建线程池
2019-12-08 本文已影响0人
但时间也偷换概念
引言
阿里巴巴&唯品会开发规范都提到了线程池使用规范,在多线程开发中,不建议直接使用Executors工具类构建线程池,并且多线程开发中强制使用线程池方式开启线程。
除此之外,线程池的管理&监控也十分重要,我们可以对线程池统一管理,使线程池的构建收口到一个工具类中,并且完成注册&释放、监控等。
ThreadPoolBuilder
在唯品会基础架构组calvin很早的blog中开源了一个ThreadPoolBuilder类,这个类提供了线程池的构建方法,我在此基础进行了改造,加入了监控&ttl等功能。
下面看一下这个类。
package edison.io.metric.threadpool;
import com.alibaba.ttl.threadpool.TtlExecutors;
import edison.io.metric.autoconfig.ThreadPoolRegistrar;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.Validate;
import java.util.concurrent.*;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
public class ThreadPoolBuilder {
private static RejectedExecutionHandler defaultRejectHandler = new AbortPolicy();
/**
* @see PoolBuilder
*/
public static PoolBuilder pool() {
return new PoolBuilder();
}
/**
* @see FixedThreadPoolBuilder
*/
public static FixedThreadPoolBuilder fixedPool() {
return new FixedThreadPoolBuilder();
}
/**
* @see CachedThreadPoolBuilder
*/
public static CachedThreadPoolBuilder cachedPool() {
return new CachedThreadPoolBuilder();
}
/**
* @see ScheduledThreadPoolBuilder
*/
public static ScheduledThreadPoolBuilder scheduledPool() {
return new ScheduledThreadPoolBuilder();
}
/**
* @see QueuableCachedThreadPoolBuilder
*/
public static QueuableCachedThreadPoolBuilder queuableCachedPool() {
return new QueuableCachedThreadPoolBuilder();
}
/**
* 优先使用threadFactory,否则如果threadNamePrefix不为空则使用自建ThreadFactory,否则使用defaultThreadFactory
*/
private static ThreadFactory createThreadFactory(ThreadFactory threadFactory, String threadNamePrefix,
Boolean daemon) {
if (threadFactory != null) {
return threadFactory;
}
if (threadNamePrefix != null) {
if (daemon != null) {
return ThreadPoolUtils.buildThreadFactory(threadNamePrefix, daemon);
} else {
return ThreadPoolUtils.buildThreadFactory(threadNamePrefix);
}
}
return Executors.defaultThreadFactory();
}
/**
* 创建FixedThreadPool.建议必须设置queueSize保证有界。
* <p>
* 1. 任务提交时, 如果线程数还没达到poolSize即创建新线程并绑定任务(即poolSize次提交后线程总数必达到poolSize,不会重用之前的线程)
* <p>
* poolSize默认为1,即singleThreadPool.
* <p>
* 2. 第poolSize次任务提交后, 新增任务放入Queue中, Pool中的所有线程从Queue中take任务执行.
* <p>
* Queue默认为无限长的LinkedBlockingQueue, 但建议设置queueSize换成有界的队列.
* <p>
* 如果使用有界队列, 当队列满了之后,会调用RejectHandler进行处理, 默认为AbortPolicy,抛出RejectedExecutionException异常.
* 其他可选的Policy包括静默放弃当前任务(Discard),放弃Queue里最老的任务(DisacardOldest),或由主线程来直接执行(CallerRuns).
* <p>
* 3. 因为线程全部为core线程,所以不会在空闲时回收.
*/
public static class FixedThreadPoolBuilder {
private int poolSize = 1;
private int queueSize = -1;
private ThreadFactory threadFactory;
private String threadNamePrefix;
private Boolean daemon;
private Boolean useTtl;
private String threadPoolName;
private RejectedExecutionHandler rejectHandler;
/**
* Pool大小,默认为1,即singleThreadPool
*/
public FixedThreadPoolBuilder setPoolSize(int poolSize) {
Validate.isTrue(poolSize >= 1);
this.poolSize = poolSize;
return this;
}
/**
* 不设置时默认为-1, 使用无限长的LinkedBlockingQueue.
* <p>
* 为正数时使用ArrayBlockingQueue.
*/
public FixedThreadPoolBuilder setQueueSize(int queueSize) {
this.queueSize = queueSize;
return this;
}
/**
* 线程池别名
*/
public FixedThreadPoolBuilder setThreadPoolName(String threadPoolName) {
this.threadPoolName = threadPoolName;
return this;
}
/**
* 与threadNamePrefix互斥, 优先使用ThreadFactory
*/
public FixedThreadPoolBuilder setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
/**
* 与ThreadFactory互斥, 优先使用ThreadFactory
*/
public FixedThreadPoolBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}
/**
* 与threadFactory互斥, 优先使用ThreadFactory
* <p>
* 默认为NULL,不进行设置,使用JDK的默认值.
*/
public FixedThreadPoolBuilder setDaemon(Boolean daemon) {
this.daemon = daemon;
return this;
}
public FixedThreadPoolBuilder setRejectHanlder(RejectedExecutionHandler rejectHandler) {
this.rejectHandler = rejectHandler;
return this;
}
public FixedThreadPoolBuilder setUseTtl(Boolean useTtl) {
this.useTtl = useTtl;
return this;
}
public ExecutorService build() {
BlockingQueue<Runnable> queue = null;
if (queueSize < 1) {
queue = new LinkedBlockingQueue<Runnable>();
} else {
queue = new ArrayBlockingQueue<Runnable>(queueSize);
}
threadFactory = createThreadFactory(threadFactory, threadNamePrefix, daemon);
if (rejectHandler == null) {
rejectHandler = defaultRejectHandler;
}
ExecutorService executorService =
new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, rejectHandler);
ThreadPoolRegistrar.register(threadPoolName, (ThreadPoolExecutor) executorService);
if (BooleanUtils.isTrue(useTtl)) {
executorService = TtlExecutors.getTtlExecutorService(executorService);
}
return executorService;
}
}
/**
* 创建CachedThreadPool, maxSize建议设置
* <p>
* 1. 任务提交时, 如果线程数还没达到minSize即创建新线程并绑定任务(即minSize次提交后线程总数必达到minSize, 不会重用之前的线程)
* <p>
* minSize默认为0, 可设置保证有基本的线程处理请求不被回收.
* <p>
* 2. 第minSize次任务提交后, 新增任务提交进SynchronousQueue后,如果没有空闲线程立刻处理,则会创建新的线程, 直到总线程数达到上限.
* <p>
* maxSize默认为Integer.Max, 可以进行设置.
* <p>
* 如果设置了maxSize, 当总线程数达到上限, 会调用RejectHandler进行处理, 默认为AbortPolicy, 抛出RejectedExecutionException异常.
* 其他可选的Policy包括静默放弃当前任务(Discard),或由主线程来直接执行(CallerRuns).
* <p>
* 3. minSize以上, maxSize以下的线程, 如果在keepAliveTime中都poll不到任务执行将会被结束掉, keeAliveTimeJDK默认为10秒.
* JDK默认值60秒太高,如高达1000线程时,要低于16QPS时才会开始回收线程, 因此改为默认10秒.
*/
public static class CachedThreadPoolBuilder {
private int minSize = 0;
private int maxSize = Integer.MAX_VALUE;
private int keepAliveSecs = 10;
private ThreadFactory threadFactory;
private String threadNamePrefix;
private Boolean daemon;
private Boolean useTtl;
private String threadPoolName;
private RejectedExecutionHandler rejectHandler;
public CachedThreadPoolBuilder setMinSize(int minSize) {
this.minSize = minSize;
return this;
}
/**
* 线程池别名
*/
public CachedThreadPoolBuilder setThreadPoolName(String threadPoolName) {
this.threadPoolName = threadPoolName;
return this;
}
/**
* Max默认Integer.MAX_VALUE的,建议设置
*/
public CachedThreadPoolBuilder setMaxSize(int maxSize) {
this.maxSize = maxSize;
return this;
}
/**
* JDK默认值60秒太高,如高达1000线程时,要低于16QPS时才会开始回收线程, 因此改为默认10秒.
*/
public CachedThreadPoolBuilder setKeepAliveSecs(int keepAliveSecs) {
this.keepAliveSecs = keepAliveSecs;
return this;
}
/**
* 与threadNamePrefix互斥, 优先使用ThreadFactory
*/
public CachedThreadPoolBuilder setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
/**
* 与threadFactory互斥, 优先使用ThreadFactory
*/
public CachedThreadPoolBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}
/**
* 与threadFactory互斥, 优先使用ThreadFactory
* <p>
* 默认为NULL,不进行设置,使用JDK的默认值.
*/
public CachedThreadPoolBuilder setDaemon(Boolean daemon) {
this.daemon = daemon;
return this;
}
public CachedThreadPoolBuilder setRejectHanlder(RejectedExecutionHandler rejectHandler) {
this.rejectHandler = rejectHandler;
return this;
}
public CachedThreadPoolBuilder setUseTtl(Boolean useTtl) {
this.useTtl = useTtl;
return this;
}
public ExecutorService build() {
threadFactory = createThreadFactory(threadFactory, threadNamePrefix, daemon);
if (rejectHandler == null) {
rejectHandler = defaultRejectHandler;
}
ExecutorService executorService = new ThreadPoolExecutor(minSize, maxSize, keepAliveSecs,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory, rejectHandler);
ThreadPoolRegistrar.register(threadPoolName, (ThreadPoolExecutor) executorService);
if (BooleanUtils.isTrue(useTtl)) {
executorService = TtlExecutors.getTtlExecutorService(executorService);
}
return executorService;
}
}
/*
* 创建ScheduledPool.
*/
public static class ScheduledThreadPoolBuilder {
private int poolSize = 1;
private ThreadFactory threadFactory;
private String threadNamePrefix;
private Boolean useTtl;
private String threadPoolName;
/**
* 默认为1
*/
public ScheduledThreadPoolBuilder setPoolSize(int poolSize) {
this.poolSize = poolSize;
return this;
}
/**
* 线程池别名
*/
public ScheduledThreadPoolBuilder setThreadPoolName(String threadPoolName) {
this.threadPoolName = threadPoolName;
return this;
}
/**
* 与threadNamePrefix互斥, 优先使用ThreadFactory
*/
public ScheduledThreadPoolBuilder setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
public ScheduledThreadPoolBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}
public ScheduledThreadPoolBuilder setUseTtl(Boolean useTtl) {
this.useTtl = useTtl;
return this;
}
public ScheduledExecutorService build() {
threadFactory = createThreadFactory(threadFactory, threadNamePrefix, Boolean.TRUE);
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(poolSize, threadFactory);
ThreadPoolRegistrar.register(threadPoolName, (ThreadPoolExecutor) executorService);
if (BooleanUtils.isTrue(useTtl)) {
executorService = TtlExecutors.getTtlScheduledExecutorService(executorService);
}
return executorService;
}
}
/**
* 从Tomcat移植过来的可扩展可用Queue缓存任务的ThreadPool
* <p>
* 1. 任务提交时, 如果线程数还没达到minSize即提交至队列中让线程抢占任务执行
* <p>
* 2. 如果达到minSize后直接创建新线程执行
* <p>
* 3. 如果达到maxSize后,尝试将任务放入队列中,如果达到QueueSize队列满后,尝试等待QueueOfferTimeOutMillisSecs后插入队列,如果任然满时抛出拒绝策略
* <p>
* 4. minSize以上, maxSize以下的线程, 如果在keepAliveTime中都poll不到任务执行将会被结束掉, keeAliveTimeJDK默认为10秒.
* JDK默认值60秒太高,如高达1000线程时,要低于16QPS时才会开始回收线程, 因此改为默认10秒.
*
* @see QueuableCachedThreadPool
*/
public static class QueuableCachedThreadPoolBuilder {
private int minSize = 0;
private int maxSize = Integer.MAX_VALUE;
private int keepAliveSecs = 10;
private int queueSize = 100;
private long queueOfferTimeOutMillisSecs = 0;
private String threadPoolName;
private ThreadFactory threadFactory;
private String threadNamePrefix;
private Boolean daemon;
private Boolean useTtl;
private RejectedExecutionHandler rejectHandler;
public QueuableCachedThreadPoolBuilder setMinSize(int minSize) {
this.minSize = minSize;
return this;
}
public QueuableCachedThreadPoolBuilder setMaxSize(int maxSize) {
this.maxSize = maxSize;
return this;
}
/**
* 线程池别名
*/
public QueuableCachedThreadPoolBuilder setThreadPoolName(String threadPoolName) {
this.threadPoolName = threadPoolName;
return this;
}
public QueuableCachedThreadPoolBuilder setQueueOfferTimeOutMillisSecs(int queueOfferTimeOutMillisSecs) {
this.queueOfferTimeOutMillisSecs = queueOfferTimeOutMillisSecs;
return this;
}
/**
* LinkedQueue长度, 默认100
*/
public QueuableCachedThreadPoolBuilder setQueueSize(int queueSize) {
this.queueSize = queueSize;
return this;
}
public QueuableCachedThreadPoolBuilder setKeepAliveSecs(int keepAliveSecs) {
this.keepAliveSecs = keepAliveSecs;
return this;
}
/**
* 与threadNamePrefix互斥, 优先使用ThreadFactory
*/
public QueuableCachedThreadPoolBuilder setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
/**
* 与threadFactory互斥, 优先使用ThreadFactory
*/
public QueuableCachedThreadPoolBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}
/**
* 与threadFactory互斥, 优先使用ThreadFactory
* <p>
* 默认为NULL,不进行设置,使用JDK的默认值.
*/
public QueuableCachedThreadPoolBuilder setDaemon(Boolean daemon) {
this.daemon = daemon;
return this;
}
public QueuableCachedThreadPoolBuilder setRejectHanlder(RejectedExecutionHandler rejectHandler) {
this.rejectHandler = rejectHandler;
return this;
}
public QueuableCachedThreadPoolBuilder setUseTtl(Boolean useTtl) {
this.useTtl = useTtl;
return this;
}
public ExecutorService build() {
threadFactory = createThreadFactory(threadFactory, threadNamePrefix, daemon);
if (rejectHandler == null) {
rejectHandler = defaultRejectHandler;
}
ExecutorService executorService =
new QueuableCachedThreadPool(minSize, maxSize, keepAliveSecs, TimeUnit.SECONDS,
new QueuableCachedThreadPool.ControllableQueue(queueSize), threadFactory, rejectHandler, queueOfferTimeOutMillisSecs);
ThreadPoolRegistrar.register(threadPoolName, (ThreadPoolExecutor) executorService);
if (BooleanUtils.isTrue(useTtl)) {
executorService = TtlExecutors.getTtlExecutorService(executorService);
}
return executorService;
}
}
public static class PoolBuilder {
private int coreSize = 0;
private int maxSize = Integer.MAX_VALUE;
private int keepAliveSecs = 10;
private String threadPoolName;
private ThreadFactory threadFactory;
private String threadNamePrefix;
private Boolean daemon;
private Boolean useTtl;
private RejectedExecutionHandler rejectHandler;
private BlockingQueue<Runnable> workQueue;
public PoolBuilder setCoreSize(int coreSize) {
this.coreSize = coreSize;
return this;
}
/**
* Max默认Integer.MAX_VALUE的,建议设置
*/
public PoolBuilder setMaxSize(int maxSize) {
this.maxSize = maxSize;
return this;
}
/**
* 线程池别名
*/
public PoolBuilder setThreadPoolName(String threadPoolName) {
this.threadPoolName = threadPoolName;
return this;
}
/**
* 工作队列,必传
*/
public PoolBuilder setWorkQueue(BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
return this;
}
/**
* JDK默认值60秒太高,如高达1000线程时,要低于16QPS时才会开始回收线程, 因此改为默认10秒.
*/
public PoolBuilder setKeepAliveSecs(int keepAliveSecs) {
this.keepAliveSecs = keepAliveSecs;
return this;
}
/**
* 与threadNamePrefix互斥, 优先使用ThreadFactory
*/
public PoolBuilder setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
/**
* 与threadFactory互斥, 优先使用ThreadFactory
*/
public PoolBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}
/**
* 与threadFactory互斥, 优先使用ThreadFactory
* <p>
* 默认为NULL,不进行设置,使用JDK的默认值.
*/
public PoolBuilder setDaemon(Boolean daemon) {
this.daemon = daemon;
return this;
}
public PoolBuilder setRejectHanlder(RejectedExecutionHandler rejectHandler) {
this.rejectHandler = rejectHandler;
return this;
}
public PoolBuilder setUseTtl(Boolean useTtl) {
this.useTtl = useTtl;
return this;
}
public ExecutorService build() {
threadFactory = createThreadFactory(threadFactory, threadNamePrefix, daemon);
if (rejectHandler == null) {
rejectHandler = defaultRejectHandler;
}
if (workQueue == null) {
throw new NullPointerException("workQueue can't be null !!!");
}
ExecutorService executorService = new ThreadPoolExecutor(coreSize, maxSize, keepAliveSecs,
TimeUnit.SECONDS, workQueue, threadFactory, rejectHandler);
ThreadPoolRegistrar.register(threadPoolName, (ThreadPoolExecutor) executorService);
if (BooleanUtils.isTrue(useTtl)) {
executorService = TtlExecutors.getTtlExecutorService(executorService);
}
return executorService;
}
}
}
package edison.io.metric.threadpool;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* 线程池工具集
*
* 1. 优雅关闭线程池的(via guava)
*
* 2. 创建可自定义线程名的ThreadFactory(via guava)
*
* 3. 防止第三方Runnable未捕捉异常导致线程跑飞
*
*/
public class ThreadPoolUtils {
/**
* 按照ExecutorService JavaDoc示例代码编写的Graceful Shutdown方法.
*
* 先使用shutdown, 停止接收新任务并尝试完成所有已存在任务.
*
* 如果1/2超时时间后, 则调用shutdownNow,取消在workQueue中Pending的任务,并中断所有阻塞函数.
*
* 如果1/2超时仍然超時,則強制退出.
*
* 另对在shutdown时线程本身被调用中断做了处理.
*
* 返回线程最后是否被中断.
*
* 使用了Guava的工具类
*
* @see MoreExecutors#shutdownAndAwaitTermination(ExecutorService, long, TimeUnit)
*/
public static boolean gracefulShutdown(ExecutorService threadPool, int shutdownTimeoutMills) {
return threadPool == null
|| MoreExecutors.shutdownAndAwaitTermination(threadPool, shutdownTimeoutMills, TimeUnit.MILLISECONDS);
}
/**
* @see
*/
public static boolean gracefulShutdown(ExecutorService threadPool, int shutdownTimeout, TimeUnit timeUnit) {
return threadPool == null || MoreExecutors.shutdownAndAwaitTermination(threadPool, shutdownTimeout, timeUnit);
}
/**
* 创建ThreadFactory,使得创建的线程有自己的名字而不是默认的"pool-x-thread-y"
*
* 使用了Guava的工具类
*
* @see ThreadFactoryBuilder#build()
*/
public static ThreadFactory buildThreadFactory(String threadNamePrefix) {
return new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").build();
}
/**
* 可设定是否daemon, daemon线程在主线程已执行完毕时, 不会阻塞应用不退出, 而非daemon线程则会阻塞.
*
* @see
*/
public static ThreadFactory buildThreadFactory(String threadNamePrefix, boolean daemon) {
return new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").setDaemon(daemon).build();
}
/**
* 防止用户没有捕捉异常导致中断了线程池中的线程, 使得SchedulerService无法继续执行.
*
* 在无法控制第三方包的Runnable实现时,调用本函数进行包裹.
*/
public static Runnable safeRunnable(Runnable runnable) {
return new SafeRunnable(runnable);
}
/**
* 保证不会有Exception抛出到线程池的Runnable包裹类,防止用户没有捕捉异常导致中断了线程池中的线程, 使得SchedulerService无法执行. 在无法控制第三方包的Runnalbe实现时,使用本类进行包裹.
*/
private static class SafeRunnable implements Runnable {
private static Logger logger = LoggerFactory.getLogger(SafeRunnable.class);
private Runnable runnable;
public SafeRunnable(Runnable runnable) {
Preconditions.checkNotNull(runnable);
this.runnable = runnable;
}
@Override
public void run() {
try {
runnable.run();
} catch (Throwable e) {
// catch any exception, because the scheduled thread will break if the exception thrown to outside.
logger.error("Unexpected error occurred in task", e);
}
}
}
}
我以其中一种builder模式介绍
public static class PoolBuilder {
private int coreSize = 0;
private int maxSize = Integer.MAX_VALUE;
private int keepAliveSecs = 10;
private String threadPoolName;
private ThreadFactory threadFactory;
private String threadNamePrefix;
private Boolean daemon;
private Boolean useTtl;
private RejectedExecutionHandler rejectHandler;
private BlockingQueue<Runnable> workQueue;
public PoolBuilder setCoreSize(int coreSize) {
this.coreSize = coreSize;
return this;
}
/**
* Max默认Integer.MAX_VALUE的,建议设置
*/
public PoolBuilder setMaxSize(int maxSize) {
this.maxSize = maxSize;
return this;
}
/**
* 线程池别名
*/
public PoolBuilder setThreadPoolName(String threadPoolName) {
this.threadPoolName = threadPoolName;
return this;
}
/**
* 工作队列,必传
*/
public PoolBuilder setWorkQueue(BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
return this;
}
/**
* JDK默认值60秒太高,如高达1000线程时,要低于16QPS时才会开始回收线程, 因此改为默认10秒.
*/
public PoolBuilder setKeepAliveSecs(int keepAliveSecs) {
this.keepAliveSecs = keepAliveSecs;
return this;
}
/**
* 与threadNamePrefix互斥, 优先使用ThreadFactory
*/
public PoolBuilder setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
/**
* 与threadFactory互斥, 优先使用ThreadFactory
*/
public PoolBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}
/**
* 与threadFactory互斥, 优先使用ThreadFactory
* <p>
* 默认为NULL,不进行设置,使用JDK的默认值.
*/
public PoolBuilder setDaemon(Boolean daemon) {
this.daemon = daemon;
return this;
}
public PoolBuilder setRejectHanlder(RejectedExecutionHandler rejectHandler) {
this.rejectHandler = rejectHandler;
return this;
}
public PoolBuilder setUseTtl(Boolean useTtl) {
this.useTtl = useTtl;
return this;
}
public ExecutorService build() {
threadFactory = createThreadFactory(threadFactory, threadNamePrefix, daemon);
if (rejectHandler == null) {
rejectHandler = defaultRejectHandler;
}
if (workQueue == null) {
throw new NullPointerException("workQueue can't be null !!!");
}
ExecutorService executorService = new ThreadPoolExecutor(coreSize, maxSize, keepAliveSecs,
TimeUnit.SECONDS, workQueue, threadFactory, rejectHandler);
ThreadPoolRegistrar.register(threadPoolName, (ThreadPoolExecutor) executorService);
if (BooleanUtils.isTrue(useTtl)) {
executorService = TtlExecutors.getTtlExecutorService(executorService);
}
return executorService;
}
}
可以看到上述代码中,可以通过builder模式传入线程池所需参数,通过builder方法生成线程池实例。build方法除此之外完成了线程池注册、监控、ttl、释放钩子等操作。
TtlExecutors是阿里巴巴开源的线程池解决方案,解决的问题是ThreadLocal线程池场景父子传递&脏数据问题,搭配TransmittableThreadLocal使用。
再看一下ThreadPoolRegistrar.register做了什么。
package edison.io.metric.autoconfig;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import edison.io.metric.threadpool.ThreadPoolUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
public class ThreadPoolRegistrar implements ApplicationRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolRegistrar.class);
public static Map<String, ThreadPoolExecutor> CONTAINER = Maps.newConcurrentMap();
public static void register(String threadPoolName, ThreadPoolExecutor executorService) {
Preconditions.checkNotNull(executorService);
CONTAINER.put(StringUtils.defaultString(threadPoolName, UUID.randomUUID().toString()), executorService);
}
public static ThreadPoolExecutor getInstance(String threadPoolName) {
return CONTAINER.get(threadPoolName);
}
@Override
public void run(ApplicationArguments applicationArguments) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
for (ExecutorService executorService : CONTAINER.values()) {
try {
ThreadPoolUtils.gracefulShutdown(executorService, 10, TimeUnit.SECONDS);
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
}
}
}));
}
}
这里是做了线程池的容器化注册,与ShutDownHook释放资源。
getInstance方法是为了监控预留的,我们可以起一个job定时去计数所有线程池各项指标。
使用方式
- 注册线程池到Spring
- 从Spring中获取线程池并使用
package edison.io.metric.autoconfig;
import edison.io.metric.threadpool.ThreadPoolBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
@Configuration
public class ThreadPoolConfig {
@Bean
public ExecutorService httpThreadPool() {
return ThreadPoolBuilder.pool().setCoreSize(5).setMaxSize(9)
.setDaemon(false).setKeepAliveSecs(60).setThreadNamePrefix("httpTask")
.setUseTtl(true).setWorkQueue(new ArrayBlockingQueue<>(1000))
.setThreadPoolName("httpPool").build();
}
}
package edison.io;
import edison.io.metric.MetricApplication;
import edison.io.metric.autoconfig.ThreadPoolRegistrar;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MetricApplication.class})
public class ThreadPoolBuilderTest {
@Autowired
@Qualifier("httpThreadPool")
ExecutorService executorService;
@Test
public void threadPoolBuilder() {
CompletableFuture.runAsync(() -> System.out.println("fork success"), executorService);
CompletableFuture.runAsync(() -> System.out.println("fork success"), executorService);
CompletableFuture.runAsync(() -> System.out.println("fork success"), executorService);
ThreadPoolExecutor threadPoolExecutor = ThreadPoolRegistrar.getInstance("httpPool");
System.out.println(threadPoolExecutor.getPoolSize());
System.out.println(threadPoolExecutor.getActiveCount());
System.out.println(threadPoolExecutor.getCorePoolSize());
System.out.println(threadPoolExecutor.getMaximumPoolSize());
System.out.println(threadPoolExecutor.getTaskCount());
}
}
日志如下:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.2.RELEASE)
2019-12-08 16:18:29.232 WARN 2334 --- [ main] o.s.boot.StartupInfoLogger : InetAddress.getLocalHost().getHostName() took 5005 milliseconds to respond. Please verify your network configuration (macOS machines may need to add entries to /etc/hosts).
2019-12-08 16:18:34.239 INFO 2334 --- [ main] edison.io.ThreadPoolBuilderTest : Starting ThreadPoolBuilderTest on zhengyunie.local with PID 2334 (started by zhengyu.nie in /Users/zhengyu.nie/IdeaProjects/metrics)
2019-12-08 16:18:34.240 INFO 2334 --- [ main] edison.io.ThreadPoolBuilderTest : No active profile set, falling back to default profiles: default
2019-12-08 16:18:35.588 INFO 2334 --- [ main] edison.io.ThreadPoolBuilderTest : Started ThreadPoolBuilderTest in 16.681 seconds (JVM running for 18.26)
fork success
fork success
3
fork success
1
5
9
3
基于ThreadPoolBuilder&ThreadPoolRegistrar实现,下一篇blog我会使用influxdb+grafana进行线程池监控,完成一整套解决方案。