线程池的拒绝策略示例
2021-06-10 本文已影响0人
overflowedstack
Java的线程池中,如果不断往线程池提交任务,最终会发生什么?
如果work queue是一个有界队列,队列放满,线程数量达到maxsize,且没有空闲线程时,再往线程池提交任务会触发线程池的拒绝策略。
线程池有哪些拒绝策略呢?
1. AbortPolicy 丢弃并抛出异常-- 默认策略
定义
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolRejectPolicyTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
for (int i = 0; i < 8; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("Thread is submitted.");
}};
threadPool.submit(task);
}
}
}
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@6bc7c054 rejected from java.util.concurrent.ThreadPoolExecutor@232204a1[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 7]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at demo.multithread.ThreadPoolRejectPolicyTest.main(ThreadPoolRejectPolicyTest.java:21)
2. DiscardPolicy 直接丢弃任务
定义
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
示例
一共提交8个任务,其中有一个默默被丢弃。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolRejectPolicyTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 8; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("Thread is submitted.");
try {
Thread.sleep(60000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}};
threadPool.submit(task);
}
int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
System.out.println("threadCount " + threadCount);
int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
threadPool.shutdown();
}
}
Thread is submitted.
threadCount 2
threadCntInWorkQueue 5
Thread is submitted.
3. DiscardOldestPolicy 丢弃最旧的任务,再试着处理
定义
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
示例
线程池1个核心线程,max线程数为2,work queue大小为5.
可以看到,提交8个任务后,第2个任务被丢弃了。因为第2个任务是oldest,第一个被放进queue的任务。
package demo.multithread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolRejectPolicyTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 8; i++) {
Runnable task = new MyTask(i);
threadPool.submit(task);
}
int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
System.out.println("threadCount " + threadCount);
int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
threadPool.shutdown();
}
}
class MyTask implements Runnable {
private int taskId;
MyTask(int i) {
taskId = i;
}
@Override
public void run() {
System.out.println("Running thread - id " + taskId);
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
}
Running thread - id 0
threadCount 2
threadCntInWorkQueue 5
Running thread - id 6
Running thread - id 2
Running thread - id 3
Running thread - id 4
Running thread - id 5
Running thread - id 7
4. CallerRunsPolicy 由调用者线程执行任务
用这种拒绝策略时要注意,主线程既需要负责创建线程,又需要执行任务,会造成性能问题。
定义
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
示例
在输出中,能看出,主线程号为1,而提交的任务中,其中一个任务(最后一个被提交的任务)就是由主线程来执行的。
package demo.multithread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolRejectPolicyTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy());
System.out.println("Main thread is " + Thread.currentThread().getId());
for (int i = 0; i < 8; i++) {
Runnable task = new MyTask(i);
threadPool.submit(task);
}
int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
System.out.println("threadCount " + threadCount);
int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
threadPool.shutdown();
}
}
class MyTask implements Runnable {
private int taskId;
MyTask(int i) {
taskId = i;
}
@Override
public void run() {
System.out.println("Running thread - id " + taskId);
System.out.println("The thread id is " + Thread.currentThread().getId());
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
}
Main thread is 1
Running thread - id 0
The thread id is 9
Running thread - id 7
The thread id is 1
Running thread - id 6
The thread id is 10
Running thread - id 1
The thread id is 9
threadCount 2
threadCntInWorkQueue 3
Running thread - id 2
The thread id is 10
Running thread - id 3
The thread id is 9
Running thread - id 4
The thread id is 10
Running thread - id 5
The thread id is 9
5. 自定义拒绝策略
了解了前四种拒绝策略,发现:
abort,discard,discardOldest都会丢弃任务;
callerRun虽然执行了任务,但是会影响主线程性能。
若将work queue设置为无界队列,或者将maxsize设置为最大整数,都有可能造成out of memory。
那么可以通过自定义拒绝策略,让后进来的task阻塞住,有资源了再处理。这样可以让每一个任务都得到执行。
package demo.multithread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolRejectPolicyTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new CustomRejectPolicy());
for (int i = 0; i < 8; i++) {
Runnable task = new MyTask(i);
threadPool.submit(task);
}
threadPool.shutdown();
}
}
class CustomRejectPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class MyTask implements Runnable {
private int taskId;
MyTask(int i) {
taskId = i;
}
@Override
public void run() {
System.out.println("Running thread - id " + taskId);
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
}
Running thread - id 0
Running thread - id 6
Running thread - id 1
Running thread - id 2
Running thread - id 3
Running thread - id 4
Running thread - id 5
Running thread - id 7