线程池Java 杂谈

java线程池自动扩容

2018-01-26  本文已影响22人  爱吃鱼aichiyu

线程池构造方法有几个重要参数:

public ThreadPoolExecutor(int corePoolSize,//核心线程数
                              int maximumPoolSize,//最大线程数
                              long keepAliveTime,//当线程数大于核心线程数时,空闲线程存活时间
                              TimeUnit unit,//空闲时间单位
                              BlockingQueue<Runnable> workQueue//任务大于线程池数量时,用于保存任务的队列
) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

当线程池核心数量不够时,新加入的任务会被存放在队列中,如果队列存满了,线程池会创建更多的线程,直到maximumPoolSize。如果还不足以处理新的任务,则面临一个丢弃策略,默认的丢弃策略是抛异常!
常用的Executors.newCachedThreadPool()和Executors.newFixedThreadPool(n),它的队列都是Integer.MAX_VALUE,所以maximumPoolSize和keepAliveTime参数就没有意义了。
如果要自己实现一个线程自动扩容方案呢?以下代码供大家测试探讨

/**
 * Created on 2018/1/26 12:55
 * <p>
 * Description: [线程池自动扩容]
 * <p>
 * Company: [xxx]
 *
 * @author [aichiyu]
 */
public class TestAutoAdjustThreadPool {

    /**
     * 队列阈值,超过此值则扩大线程池
     */
    private static final int MAX_QUEUE_SIZE = 100;

    /**
     * 每次扩容自动增加线程数
     */
    private static final int PER_ADD_THREAD = 10;

    /**
     * 监控积压时间频率
     */
    private static final int MONITOR_DELAY_TIME = 1;

    private ScheduledExecutorService scheduledExecutorService ;

    private ThreadPoolExecutor executor ;


    public void start(){
        executor =new ThreadPoolExecutor(10, 100,
                60L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        scheduledExecutorService = new ScheduledThreadPoolExecutor(10, new BasicThreadFactory.Builder().namingPattern("mq-monitor-schedule-pool-%d").daemon(true).build());
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            System.out.println("当前线程池状态!"+executor);
            //当队列大小超过限制,且jvm内存使用率小于80%时扩容,防止无限制扩容
            if(executor.getQueue().size() >= MAX_QUEUE_SIZE && executor.getPoolSize()< executor.getMaximumPoolSize() && getMemoryUsage()<0.8){
                System.out.println("线程池扩容!"+executor);
                executor.setCorePoolSize(executor.getPoolSize() + PER_ADD_THREAD);
            }
            //当队列大小小于限制的80%,线程池缩容
            if(executor.getPoolSize() > 0  && executor.getQueue().size() < MAX_QUEUE_SIZE * 0.8  ){
                System.out.println("线程池缩容!"+executor);
                executor.setCorePoolSize(executor.getPoolSize() - PER_ADD_THREAD);
            }


        }, MONITOR_DELAY_TIME, MONITOR_DELAY_TIME, TimeUnit.SECONDS);
    }

    public void stop() throws InterruptedException {
        executor.shutdown();
        while (!executor.awaitTermination(1,TimeUnit.SECONDS)){
            //等待线程池中任务执行完毕
        }
        scheduledExecutorService.shutdown();
    }

    public <T> Future<T> submit(Callable<T> task) {
        return executor.submit(task);
    }
    public Future<?> submit(Runnable task) {
        return executor.submit(task);
    }


    /**
     * 获取jvm内存使用率
     * @return
     */
    public static double getMemoryUsage() {
        return (double) (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / Runtime.getRuntime().maxMemory();
    }


    public static void main(String[] args) throws InterruptedException {
        TestThreadPoolAutoExpand pool = new TestThreadPoolAutoExpand();
        pool.start();
        for (int i = 0; i < 1000; i++) {
            pool.submit(()->{
                System.out.println(Thread.currentThread()+" execute!~~");
                try {
                    Thread.sleep(1500L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        pool.stop();
    }
}
上一篇下一篇

猜你喜欢

热点阅读