Java如何控制线程并行执行数量

2018-12-30  本文已影响0人  _男猪脚

有这样一种业务场景,从远端服务下载文件到本地,每个文件都10M+比较大,定时任务串行下载效率低下,所以考虑固定数量的线程并行去下载,这样可以控制对机器的资源消耗,同时提高下载吞吐量,对于这样的场景就需要控制线程同时执行时的数量。实现方式其实有多种,本人提供以下两种方式:

  1. 利用信号量(Semaphore ),通过acquire和release方法控制同时访问的线程个数。
public static void main(String[] args) {
        ExecutorService pool = Executors.newCachedThreadPool();
        // 创建给定个数的公平模式的信号量
        final Semaphore sp = new Semaphore(3, true);
        for (int i = 0; i < 10; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        // 获取許可
                        sp.acquire();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    /* 业务逻辑处理 .... */
                
                    // 释放許可
                    sp.release();
                }
            };
            pool.execute(runnable);
        }
    }
  1. 自定义线程池,重写拒绝策略,提交任务至任务队列由非阻塞的offer方法改成阻塞的put方法,任务队列满了之后,后续提交的任务就会阻塞,这样就能够有效控制执行的线程数。
    /**
     * 创建固定大小的阻塞式的线程池, 并且指定工作队列大小
     * @param poolSize 线程个数
     * @param queueSize 队列大小
     * @return
     */
    public static ExecutorService newBlockingThreadPool(int poolSize, int queueSize) {
        if (poolSize < 2) {
            poolSize = DEFAULT_POOL_SIZE;
        }
        if (queueSize < poolSize) {
            queueSize = poolSize;
        }
        ExecutorService es = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, 
                new LinkedBlockingQueue<Runnable>(queueSize), new RejectedExecutionHandler() {
            /**
              * 自定义拒绝策略, 当工作队列满时, 生产者调用put阻塞
             */
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (! executor.isShutdown()) {
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        logger.error("Create blocking thread pool error !", e);
                    }
                }
            }
        });
        return es;
    } 
上一篇 下一篇

猜你喜欢

热点阅读