线程池的拒绝策略示例

2021-06-10  本文已影响0人  overflowedstack

Java的线程池中,如果不断往线程池提交任务,最终会发生什么?
如果work queue是一个有界队列,队列放满,线程数量达到maxsize,且没有空闲线程时,再往线程池提交任务会触发线程池的拒绝策略。

线程池有哪些拒绝策略呢?

1. AbortPolicy 丢弃并抛出异常-- 默认策略

定义
    /**
     * The default rejected execution handler
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolRejectPolicyTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
        
        for (int i = 0; i < 8; i++) {
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    System.out.println("Thread is submitted.");
            }};
                
            threadPool.submit(task);
        }
    }
}
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@6bc7c054 rejected from java.util.concurrent.ThreadPoolExecutor@232204a1[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 7]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at demo.multithread.ThreadPoolRejectPolicyTest.main(ThreadPoolRejectPolicyTest.java:21)

2. DiscardPolicy 直接丢弃任务

定义
    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
示例

一共提交8个任务,其中有一个默默被丢弃。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolRejectPolicyTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, 
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.DiscardPolicy());
        
        for (int i = 0; i < 8; i++) {
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    System.out.println("Thread is submitted.");
                    try {
                        Thread.sleep(60000l);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
            }};
                
            threadPool.submit(task);
        }
        
        int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
        System.out.println("threadCount " + threadCount);
        
        int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
        System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
                
        threadPool.shutdown();
    }
}
Thread is submitted.
threadCount 2
threadCntInWorkQueue 5
Thread is submitted.

3. DiscardOldestPolicy 丢弃最旧的任务,再试着处理

定义
    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
示例

线程池1个核心线程,max线程数为2,work queue大小为5.
可以看到,提交8个任务后,第2个任务被丢弃了。因为第2个任务是oldest,第一个被放进queue的任务。

package demo.multithread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolRejectPolicyTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, 
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.DiscardOldestPolicy());
        
        for (int i = 0; i < 8; i++) {
            Runnable task = new MyTask(i);
                
            threadPool.submit(task);
        }
        
        int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
        System.out.println("threadCount " + threadCount);
        
        int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
        System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
                
        threadPool.shutdown();
    }
}

class MyTask implements Runnable {
    private int taskId;
    
    MyTask(int i) {
        taskId = i;
    }

    @Override
    public void run() {
        System.out.println("Running thread - id " + taskId);
        try {
            Thread.sleep(1000l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int getTaskId() {
        return taskId;
    }

    public void setTaskId(int taskId) {
        this.taskId = taskId;
    }
}
Running thread - id 0
threadCount 2
threadCntInWorkQueue 5
Running thread - id 6
Running thread - id 2
Running thread - id 3
Running thread - id 4
Running thread - id 5
Running thread - id 7

4. CallerRunsPolicy 由调用者线程执行任务

用这种拒绝策略时要注意,主线程既需要负责创建线程,又需要执行任务,会造成性能问题。

定义
    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
示例

在输出中,能看出,主线程号为1,而提交的任务中,其中一个任务(最后一个被提交的任务)就是由主线程来执行的。

package demo.multithread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolRejectPolicyTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, 
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy());
        
        System.out.println("Main thread is " + Thread.currentThread().getId());
        
        for (int i = 0; i < 8; i++) {
            Runnable task = new MyTask(i);
                
            threadPool.submit(task);
        }
        
        int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
        System.out.println("threadCount " + threadCount);
        
        int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
        System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
                
        threadPool.shutdown();
    }
}

class MyTask implements Runnable {
    private int taskId;
    
    MyTask(int i) {
        taskId = i;
    }

    @Override
    public void run() {
        System.out.println("Running thread - id " + taskId);
        System.out.println("The thread id is " + Thread.currentThread().getId());
        try {
            Thread.sleep(1000l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int getTaskId() {
        return taskId;
    }

    public void setTaskId(int taskId) {
        this.taskId = taskId;
    }
}
Main thread is 1
Running thread - id 0
The thread id is 9
Running thread - id 7
The thread id is 1
Running thread - id 6
The thread id is 10
Running thread - id 1
The thread id is 9
threadCount 2
threadCntInWorkQueue 3
Running thread - id 2
The thread id is 10
Running thread - id 3
The thread id is 9
Running thread - id 4
The thread id is 10
Running thread - id 5
The thread id is 9

5. 自定义拒绝策略

了解了前四种拒绝策略,发现:
abort,discard,discardOldest都会丢弃任务;
callerRun虽然执行了任务,但是会影响主线程性能。

若将work queue设置为无界队列,或者将maxsize设置为最大整数,都有可能造成out of memory。

那么可以通过自定义拒绝策略,让后进来的task阻塞住,有资源了再处理。这样可以让每一个任务都得到执行。

package demo.multithread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolRejectPolicyTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, 
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new CustomRejectPolicy());
                
        for (int i = 0; i < 8; i++) {
            Runnable task = new MyTask(i);
                
            threadPool.submit(task);
        }
                
        threadPool.shutdown();
    }
}

class CustomRejectPolicy implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
    }
    
}

class MyTask implements Runnable {
    private int taskId;
    
    MyTask(int i) {
        taskId = i;
    }

    @Override
    public void run() {
        System.out.println("Running thread - id " + taskId);
        try {
            Thread.sleep(1000l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int getTaskId() {
        return taskId;
    }

    public void setTaskId(int taskId) {
        this.taskId = taskId;
    }
}
Running thread - id 0
Running thread - id 6
Running thread - id 1
Running thread - id 2
Running thread - id 3
Running thread - id 4
Running thread - id 5
Running thread - id 7
上一篇下一篇

猜你喜欢

热点阅读