Java 杂谈

Java不魔改默认线程池机制来实现可伸缩的线程池

2019-04-18  本文已影响0人  大侠陈

java的线程池提供设置基本大小和最大大小两个参数来实现可伸缩的线程池

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue)

可java的默认实现有时候却并不是我们想要的,在这个默认实现中,线程池创建之初,线程池会创建 corePoolSize 个数量的线程,这些线程不停的处理 workQueue 中的任务, 当 workQueue 被填满,当前所有线程全部处于忙碌状态,已经无法腾出时间来处理新任务,线程池才会创建新的线程来处理任务, 也就是说,如果workQueue未被填满,线程池是不会创建新线程的。 那么,当 workQueue 是个无界队列时,maximumPoolSize 参数是无效的。 而当 workQueue 是有界队列时,假如即使 maximumPoolSize 个线程也无法满足任务处理的速率,则那些未能被处理的任务将被饱和策略退回,有时候这并不符合我们的预期。 所以, 真正的能满足我们需求的可伸缩线程池需要我们自己实现。

在这里, 我讲解两种方法

第一种比较简单,只需要单纯的设置默认线程池的参数即可实现

public class CustomThreadPool implements Executor {
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    ThreadPoolExecutor threadPoolExecutor;

    public CustomThreadPool(int nThreads, int keepAliveTime) {
        threadPoolExecutor = new ThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.SECONDS, queue);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
    }

    @Override
    public void execute(@NotNull Runnable command) {
        threadPoolExecutor.execute(command);
    }
}

我们通过默认线程池构造函数初始化一个线程池, 将 corePoolSize 和 maximumPoolSize 的值设置成相同, 然后再设置空闲线程的回收时间,这样我们便创建了一个固定大小的线程池。 然而,此时线程池空闲的线程是无法被回收的,因为可以被回收的线程只能是 maximumPoolSize 减去 corePoolSize 个数量的线程, 此时这个数量为0,自然也就不会有线程被回收。

但是我们可以

  threadPoolExecutor.allowCoreThreadTimeOut(true); 

通过设置这个值来取消上面的限制, 当allowCoreThreadTimeOut 设置为true,无关 corePoolSize 的大小, 只要满足回收条件就都可以被回收, 如果所有线程都空闲,则所有线程都会被回收。

测试代码如下

CustomThreadPool customThreadPool = new CustomThreadPool(10,60);

int count = 0;

while (count < 200) {
  customThreadPool.execute(() -> {
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("hello world");
  });
  count++;
}

这种方法的好处是简单,坏处是线程的创建和回收频率比较高

第二种方法克服了第一种方法存在的问题,可是需要一条额外的线程来管理任务

public class ElasticityThreadPool implements Executor {
    SynchronousQueue<Runnable> tasks = new SynchronousQueue<>();
    LinkedBlockingQueue<Runnable> buffTasks = new LinkedBlockingQueue<>();

    ThreadPoolExecutor threadPoolExecutor;

    public ElasticityThreadPool(int  , int maximumPoolSize, int keepAliveTime) {
        threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, tasks);
        threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

        Thread managerThread = new Thread(() -> {
            while (true) {
                try {
                    Runnable buffTask = buffTasks.take();
                    try {
                        threadPoolExecutor.execute(buffTask);
                    } catch (RejectedExecutionException e) {
                        buffTasks.offer(buffTask);
                    }
                } catch (InterruptedException e) {
                    break;
                }
            }
        });
        managerThread.setName("ElasticityThreadPool-TaskManager");
        managerThread.start();
    }

    @Override
    public void execute(Runnable runnable) {
        buffTasks.offer(runnable);
    }
}

这个线程池使用了有界队列,而且是一个极端有界队列 SynchronousQueue , 当 corePoolSize 数量个线程全都忙碌时,新的线程将会被创建。而当 maximumPoolSize 个线程也无法满足任务的处理时, 饱和策略将会发挥作用

threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

线程池会拒绝执行任务,并抛出 RejectedExecutionException 异常。

我们从代码中看到,还存在一个名叫 buffTasks 的误解队列, 此队列的用作任务的中转, 所有提交给线程池的任务事先都会被置入此队列

public void execute(Runnable runnable) {
  buffTasks.offer(runnable);
}

而管理线程则不断的从此队列中读取任务,并提交给真正的线程池

Thread managerThread = new Thread(() -> {
  while (true) {
    try {
      Runnable buffTask = buffTasks.take();
      try {
        threadPoolExecutor.execute(buffTask);
      } catch (RejectedExecutionException e) {
        buffTasks.offer(buffTask);
      }
    } catch (InterruptedException e) {
      break;
    }
  }
});
managerThread.setName("ElasticityThreadPool-TaskManager");
managerThread.start();
}

当线程池因为饱和而无法处理的任务通过饱和策略退回是,管理线程会再次将之存入 buffTasks, 以待后续处理

try {
    threadPoolExecutor.execute(buffTask);
} catch (RejectedExecutionException e) {
    buffTasks.offer(buffTask);
}

这里因为没有设置

allowCoreThreadTimeOut

所以被回收的线程按照线程池默认的机制处理,线程池总会保持 corePoolSize 个线程的随时就绪

线程池使用方法如下

ElasticityThreadPool threadPool = new ElasticityThreadPool(2, 10, 60);

int count = 0;
while (count < 200) {
  threadPool.execute(() -> {
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("hello world");
  });
  count++;
}

这种方法的好处是实现了真正的可伸缩,坏处是多使用了一个额外的线程

两种方法的孰优孰劣看具体的使用场景而定。

上一篇下一篇

猜你喜欢

热点阅读