线程池

ReentrantLock限制线程池队列大小

2018-01-25  本文已影响6人  爱吃鱼aichiyu

关于线程池介绍,我不在此赘叙,请参考https://www.jianshu.com/p/ade771d2c9c0
线程池中queue一般设置大小默认是Integer.MAX_VALUE,如果设置了大小,就必须实现一个丢弃策略,而默认的丢弃策略居然是抛异常。

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

当任务量超大,内存被撑满造成宕机,会导致所有的任务都丢失了。当然,可以使用MQ来解决类似的问题。在此我们只讨论使用线程池本身来解决。
那能不能人为控制队列大小,当队列达到该值,就不再往线程池队列里提交任务呢?以下采用ReentrantLock可重入锁机制来实现

/**
 * Created on 2018/1/22 16:29
 * <p>
 * Description: [测试控制线程池队列大小]
 * <p>
 * Company: [xxxx]
 *
 * @author [aichiyu]
 */
public class TestLockPool {

    private int maxSize = 100 ;

    private final ReentrantLock lock = new ReentrantLock();
    private List<Condition> list = new LinkedList<>();

    private ThreadPoolExecutor executor =new ThreadPoolExecutor(20, 100,
                                      60L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());

    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);


    public void init(){
        scheduledExecutorService.scheduleAtFixedRate(()->{
            int queueSize = executor.getQueue().size();
            //每秒检查一次,当队列中任务被执行完就解锁一批任务,继续往队列中加
            if( queueSize < maxSize * 0.8 && list.size() > 0  ){
                System.out.println("unlock !!~~");
                lock.lock();
                int i = 0 ;
                Iterator<Condition> iterator = list.iterator();
                while (i < maxSize-queueSize && iterator.hasNext()){
                    iterator.next().signal();
                    iterator.remove();
                    i++;
                }
                System.out.println("signal over!!~~,num="+(i));
                lock.unlock();
            }
        },1,1, TimeUnit.SECONDS);
    }

    private void consume(){

        try {
            //当队列大小超过限制,阻塞当前线程,等待队列空闲
            if(executor.getQueue().size() >= maxSize ){
                System.out.println(Thread.currentThread()+" wait !!~"+"pool queue size = "+executor.getQueue().size());
                lock.lock();
                Condition condition = lock.newCondition();
                list.add(condition);
                condition.await();
                System.out.println(Thread.currentThread()+"wait over!~~");
                lock.unlock();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.submit(()->{
            System.out.println(Thread.currentThread()+" execute !!~~"+"pool queue size = "+executor.getQueue().size());


            try {
                //模拟任务阻塞
                Thread.sleep(2500L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    public static void main(String[] args)  {
        TestLockPool testLock = new TestLockPool();
        testLock.init();
         ExecutorService service = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 200; i++) {
            service.submit(()->testLock.consume());
        }

        System.out.println("main over!~");
    }

}
上一篇下一篇

猜你喜欢

热点阅读