高并发篇-浅谈线程池那些事
首先啰嗦两句,高并发并不意味着就是线程池那些玩意,但是线程池的灵活运用可以很好的缓解高QPS的问题。在大型互联网公司里面,线程池的运用可以说是无处不在,可能会有部分同学会说,我extends thread或者implements runnable接口不就行了用啥线程池?但是在集团内部我们代码规定不建议开发者用new thread的形式去开辟一个线程,为什么呢?一方面你可以看到,代码精简了很多,第二个地方,不便于统一管理和监控,第三点就是每次都要创建和销毁,会消耗cpu资源。
线程池、数据库连接池等等各种池化的技术,其实从本质上来讲,都是很相似的,通过空间换取时间,开箱即用,执行完成后线程对象归池,为了复用资源,避免资源频繁的建立和销毁。
线程池的处理流程,直观的看一下:
提交任务.png
下面是总结的线程池里面一些比较核心的参数:
线程池.png
其中讲一下corePoolSize这个参数,corePoolSize表示线程池中的核心线程数,核心线程默认是在有任务提交过来的时候才创建的,当有任务提交时,线程池会创建一个新线程去执行任务,直到当前线程数等于corePoolSize大小;当线程数为corePoolSize的时候,后面的任务将丢到阻塞队列去;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
在线程的参数调优中,如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1,如果是IO密集型任务,参考值可以设置为2*NCPU。但是小白会问怎样的是cpu密集型,啥又是io密集型呢?一般来说,前端频繁的发请求调用后端,后台去查数据库或者缓存,然后返回前端,这种就是就是典型的io密集型,高度依赖IO和网络环境,此时CPU Loading并不高。还有一种叫cpu密集型,依赖cpu去做计算、逻辑判断等等,此时IO不高。具体的参数最终都要真实场景去做灵活的调整,一般我们的项目上线之前,都会有各种测试,甚至pt环境压力测试。
另一个重要的参数keepAliveTime:keepAliveTime这个其实只有在线程大于核心线程数的时候才会有起作用,当这部分的线程空闲时间达到设定keepAliveTime的时间后会终止直达会退到核心线程数的大小。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;但是对于线上一直会有请求进来的业务场景,比如电商项目,一般就不会这么做了,会有核心线程一直常驻在那。
饱和策略:
实际具体选用那种处理策略,要结合自己的业务来挑选,看业务层面是否允许丢弃任务,一般对于重要的消息,我们可以记录日志甚至要做持久化的。线程池默认的策略是AbortPolicy,这个时候会丢弃任务,并且会抛出RejectedExecutionException异常
线程池状态:(就不解释了)
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
关于任务的提交,建议用submit
避雷:在线程池中使用ThreadLocal的时候要千万注意,用完了记得remove,防止内存泄漏
我改造的一个单列模式的线程池Demo以供参考:
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPools {
private ExecutorService scheduledPool;
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE=Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAX_POOL_SIZE =CPU_COUNT <<1 + 1;//左移提高速度
private static final int KEEP_ALIVE_TIME =30;
private ThreadPools() {
scheduledPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100), new NamedThreadFactory("scheduled Pool"));
}
public static ThreadPools getInstance() {
return ThreadPoolManagerHolder.instance;
}
private static class ThreadPoolManagerHolder {
public static ThreadPools instance = new ThreadPools();
}
private static class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public NamedThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + name + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
public void executeScheduledTask(Runnable task) {
try {
scheduledPool.submit(task);
} catch (RejectedExecutionException e) {
LogUtils.ERROR.debug("executeScheduledTask task submit failed");
}
}
public void shutThreadPooldown() {
scheduledPool.shutdown();
}
}