6-ExecutorService

2020-02-11  本文已影响0人  鹏程1995

概述

引入

我们之前记录了Executor接口,它是用来将子任务的创建和启动两个操作进行解耦而出现的,我们在上一节中的“使用思路“构建了n个子线程用来不停的执行到来的任务。但是我们的例子使用并不方便,比如:

  1. 我们只提供无返回结果的子任务执行,如果有结果穿出就得自己在子任务中做变量同步
  2. 只提供了执行接口,并利用守护线程和用户线程的关系完成退出,没有提供标准的线程池的关闭功能
  3. 只有单独执行,没有批量操作

我们本次介绍的新的接口ExecutorService也是基于这些方面对Executor进行了拓展。

摘要

本文主要介绍ExecutorService的各个接口的设计意图,并对上一节我们设计的例子进行了进一步的完善。

类介绍

类定位

ExecutorService是线程池框架的一个重要的借口,它定义了一个作为线程池的实现类必须要实现的几个方法。

注意

在实现ExecutorService接口时要注意线程安全,此类是有可能同时被多个线程操作的,比如同时加入任务,同时调用关闭等等。

源码解读

提交任务

/**
 * 提交一个有返回值的任务,并返回一个可以检测此任务状态和取消此任务的 Future 。
 * 此函数只是提交任务,具体是直接执行还是排队等着就看各自实例的实现了。【有点像 Executors.execute()】
 *
 * 
 * @param task 任务
 * @param <T> 返回值类型
 * @return 代表此任务的 Future
 * @throws RejectedExecutionException 拒绝执行此任务时抛出
 * @throws NullPointerException 入参为空时抛出
 */
<T> Future<T> submit(Callable<T> task);

/**
 * 提交一个没有返回值的任务,和完成此任务后返回的值,其他和上面的一样
 *
 */
<T> Future<T> submit(Runnable task, T result);

/**
 * 返回的结果是 null ,其他的和上面一样
 *
 */
Future<?> submit(Runnable task);

批量提交任务

/**
 * 批量提交任务并进行执行、等待。
 * 注意这里不同上面单次提交任务,这里提交后要等待任务全部执行完成后才能返回,
 * 如果你主线程中有其他的并行操作而且追求调用效率,可以自己用循环调用 submit ,如果图方便的话还是可以直接
 * 调用这个的。
 * 入参的集合结构不能变,入参的集合结构不能变,入参的集合结构不能变,重要的话说三次
 *
 * @param tasks 任务集合
 * @param <T> 任务集返回值类型
 * @return 一串代表任务的 Future ,和入参集合的遍历顺序相同
 * @throws InterruptedException 等待执行结果时遇到了线程中断时抛出,未完成的任务会被取消
 * @throws NullPointerException 入参集合为 null 或者入参集合中有元素为 null时抛出
 * @throws RejectedExecutionException 如果有任务被拒绝执行时抛出
 */
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;

/**
 * 批量执行任务、并限时等待。
 * 时间到达时未完成的任务会被取消。其他的和上面函数相同
 *
 */
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
    throws InterruptedException;

/**
 * 批量执行任务,返回第一个完成执行的任务,并取消其他的任务。
 *
 * 其他的和上面的 invokeAll() 一样 
 *
 * @throws ExecutionException 如果提交的所有任务都没有执行成功
 */
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;

/**
 * 增加了限时要求。
 *
 *
 * @throws TimeoutException 在给定时间内没有一个执行成功时抛出
 */
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

状态检测

/**
 * 判断线程池是否被关闭【不接受新任务】
 */
boolean isShutdown();

/**
 * 判断线程池是否已经终结【不接受新任务而且现有的所有任务都结束,可以安全回收此实例的那种】
 */
boolean isTerminated();

关闭

/**
 * 开始进行有序的关闭,不允许再提交任务,现有的任务会继续执行。
 *
 * 此方法会立刻返回,你需要调用 awaitTermination() 等待现有任务执行完
 *
 *
 * @throws SecurityException 【后面再详细了解吧】
 */
void shutdown();

/**
 * 尝试终止所有正在执行的任务,停止等待的任务,然后把等待的任务列表返回。
 *
 * 终止正在执行的任务我们仅做了线程中断,至于子线程什么时候反应,要多久反应,
 * 此方法不会阻塞等待,如果有需要就用 awaitTerminantion() 阻塞主线程等待
 *
 * 如果有子线程对中断不反应,继续执行他的任务,我们也没办法,我们只能告诉它“该停了”,
 * 其他的也没什么用了。
 *
 * @return 提交后未执行的任务
 * @throws SecurityException 
 */
List<Runnable> shutdownNow();

使用示例

示例代码——MyExecutorService

package com.gateway.concurrent.pool;

import java.util.*;
import java.util.concurrent.*;

public class MyExecutorService implements ExecutorService {

    private final Thread[] threads = new Thread[POOL_SIZE];

    private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>();

    private volatile boolean ifAcceptNewTask = true;

    private volatile boolean ifTryToStopThread = false;

    private final static int POOL_SIZE = 5;

    private final static long SLEEP_UNIT = 1000;

    public MyExecutorService() {
        for (int i=0;i<POOL_SIZE;i++){
            threads[i] = new Thread(()->{
                while (true){
                    // 取任务
                    try {
                        Runnable runnable = null;
                        runnable = tasks.take();
                        runnable.run();
                    } catch (InterruptedException e) {
                        // 取任务时被中断
                        e.printStackTrace();
                        if (ifTryToStopThread){// 要停止线程了
                            System.out.println("此线程收到结束中断,退出执行");
                            break;
                        }

                    }

                }

            });
            threads[i].start();
        }
    }

    @Override
    public void shutdown() {
        // 不接受新任务
        this.ifAcceptNewTask = false;

    }

    @Override
    public List<Runnable> shutdownNow() {
        this.ifAcceptNewTask = false;
        this.ifTryToStopThread=true;

        List<Runnable> unExecutedTasks = new ArrayList<>();
        tasks.drainTo(unExecutedTasks);

        for (int i =0;i<POOL_SIZE;i++){
            if (threads[i].isAlive()){
                threads[i].interrupt();
            }
        }
        return unExecutedTasks;
    }

    @Override
    public boolean isShutdown() {
        return this.ifAcceptNewTask;
    }

    @Override
    public boolean isTerminated() {
        for (int i=0;i<POOL_SIZE;i++){
            if (threads[i].isAlive()){
                return false;
            }
        }
        return true;
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        Long deadLine = System.currentTimeMillis()+unit.toNanos(timeout);
        while (!isTerminated()){
            // 超时
            if (System.currentTimeMillis() > deadLine){
                return false;
            }
            // 等待
            if (System.currentTimeMillis() - deadLine > SLEEP_UNIT){
                Thread.currentThread().sleep(SLEEP_UNIT);
            }
        }
        return true;
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
      // TODO 这里加一下对线程池状态的判断,不要直接加
        FutureTask<T> temp = new FutureTask<>(task);
        try {
            tasks.put(temp);
        } catch (InterruptedException e) {
            e.printStackTrace();

            // 被打断干脆就不等了,其实讲真估计也不会到这一步,
            // 毕竟LinkedBlockingQueue是无界到,也不会满
            throw new RejectedExecutionException("任务队列已满,阻塞等待时被打断",e);
        }

        return temp;
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return this.submit(()->{
            task.run();
            return result;
        });
    }

    @Override
    public Future<?> submit(Runnable task) {
        return this.submit(task,null);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        if (Objects.isNull(tasks)){
            throw new NullPointerException("入参不能为空");
        }

        // 创建任务
        Iterator<Callable<T>> iterator = (Iterator<Callable<T>>) tasks.iterator();
        List<Future<T>> result = new ArrayList<>();

        while (iterator.hasNext()){
            result.add(this.submit(iterator.next()));
        }

        // 等待任务完成
        Iterator<Future<T>> resultIterator = result.iterator();
        while (resultIterator.hasNext()){
            try {
                resultIterator.next().get();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        return result;
    }

    // 思路差不多,不写了
    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return null;
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return null;
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return null;
    }

    @Override
    public void execute(Runnable command) {
        if (Objects.isNull(command)){
            throw new NullPointerException("入参不能为空");
        }
        try {
            this.tasks.put(command);
        } catch (InterruptedException e) {
            e.printStackTrace();
            // 被打断干脆就不等了,其实讲真估计也不会到这一步,
            // 毕竟LinkedBlockingQueue是无界到,也不会满
            throw new RejectedExecutionException("任务队列已满,阻塞等待时被打断",e);
        }

    }


}

示例代码——Main函数

package com.gateway.concurrent.pool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class Pool1 {
    public static void main(String args[]){
        MyExecutorService myExecutorService = new MyExecutorService();
        List<Future<String>> futureTasks = new ArrayList<>();
        for (int i = 0; i < 40; i++) {
            futureTasks.add(myExecutorService.submit(new TestCalss("任务" + i)));
        }

        // 主线程继续做一些别的事
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


        for (int i = 0; i < futureTasks.size(); i++) {
            try {
                System.out.println(futureTasks.get(i).get());
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("等待过程中被打断,重新等待这个任务结束");
                i--;

            } catch (ExecutionException e) {
                e.printStackTrace();
                System.out.println("任务执行失败");
            }
        }

        // 关闭线程池
        myExecutorService.shutdown();

        try {
            // 如果可以正常关闭,最好
            // 当然,我们自己写到代码,MyExecutorService 是肯定不能正常关闭的
            // 我们这里的等待时间是给任务队列中尚未执行的任务的
            if (myExecutorService.awaitTermination(1000, TimeUnit.NANOSECONDS)){
                return;
            }

            myExecutorService.shutdownNow();

            // 我们这里的等待时间是给线程池中的线程停止正在执行的任务的
            if (myExecutorService.awaitTermination(1000, TimeUnit.NANOSECONDS)){
                return;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    static class TestCalss implements Callable<String> {

        private String name;

        public TestCalss(String name) {
            this.name = name;
        }

        @Override
        public String call() throws Exception {
            Long sleepTime = (long) (Math.random() * 1000);
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return name + " 完成,用时 " + sleepTime + " 毫秒";
        }
    }

}

核心逻辑介绍

没啥说的,个人感觉相比于上一个版本,最大的进步就是:

  1. 可以返回值了
  2. 可以主动控制线程池的关闭回收了【这次我们没有将线程池中的线程设置为守护线程,你能正常运行完程序就表示你成功关闭了线程池里的五条线程】

使用思路

使用更加简便,我们一般都是作为线程池的使用者的,所以在大多数情况下不是很关系线程池的内部逻辑,我们调用的思路是:

  1. 创建/获得一个线程池实例
  2. 任务入线程池
  3. 获得任务结果
  4. 完成所有操作后关闭线程池

扩展

参考文献

上一篇 下一篇

猜你喜欢

热点阅读