十三、多线程执行器

2020-07-19  本文已影响0人  blank_white

十三、多线程执行器

FutureTask 和 Callable

可以用来启动一个需要很长时间的计算任务


        // Callable 与 Runnable 类似,但是有返回值
        Callable<Integer> callable=()->{
            int sum=0;
            for (int i = 0; i < 1000000000; i++) {
                sum+=i;
            }
            return sum;
        };


        // FutureTask 可以将 Callable 包装成实现了 Runnable 和 Future 接口的对象
        FutureTask<Integer> futureTask = new FutureTask<>(callable);

        // 开启线程运行任务
        Thread thread=new Thread(futureTask);
        thread.start();

        
        // 获取任务的执行结果,未计算完会阻塞
        // 如果计算线程被中断,抛出InterruptedException
        futureTask.get();
        //
        try {
            // 设置最长阻塞时间,超时还没得出结果会抛出异常
            futureTask.get(10L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


        // 如果计算还没开始,取消该任务。如果计算处于运行中,且参数为 true ,则计算会被中断
        futureTask.cancel(true);

        futureTask.isDone();
        futureTask.isCancelled();

        System.out.println(futureTask.get());
线程池

线程创建是有消耗的,可以提前创建一些线程,需要的时候直接使用。维护这些线程的池子,也就是线程池

执行器

通过 Executors 的工厂方法获得线程池的执行器 ExecutorService

用 ExecutorService 来执行线程并得到一个 Future

用 Future 监视线程执行状况


        // 执行器 
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 必要时创建新线程,空闲线程会被保留 60秒
        Executors.newCachedThreadPool();
        // 包含固定数量的线程,空闲线程会一直被保留,参数为线程数
        Executors.newFixedThreadPool(32);
        // 只有一个线程的“池”,会按顺序执行提交的任务
        Executors.newSingleThreadExecutor();
        // 用于预定执行一些任务而构建的固定线程池,支持定时及周期性的执行任务,参数为线程数
        Executors.newScheduledThreadPool(32);
        // ScheduledThreadPool 的单线程“池”版
        Executors.newSingleThreadScheduledExecutor();


        Runnable runnable=()->{};
        Callable callable=()->1;
        Future future;
        // 提交一个 Runnable 或者 Callable 对象给 Executor 并返回一个 future 来监控执行
        // 调用 future.get() 会返回 null ,runnable 没返回值
        future=executorService.submit(runnable);
        // 调用 future.get() 会在任务完成后,把第二个参数传入的对象返回
        future=executorService.submit(runnable,new Object());

        future=executorService.submit(callable);


        // 当不在需要提交任何任务的时候调用,关闭执行器,不再接受新任务,当所有任务完成后,线程池的线程死亡
        executorService.shutdown();
        // 会取消尚未开始的任务,并且试图中断正在运行的线程
        executorService.shutdownNow();

预定和周期性的执行任务

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(12);
        
        Runnable runnable=()->{};
        Callable callable=()->1;
        // 在指定延迟后执行任务 , 在 10s 后执行任务
        executorService.schedule(runnable,10, TimeUnit.SECONDS);
        executorService.schedule(callable,10, TimeUnit.SECONDS);
        // 在指定延迟后周期性的执行任务, 10s 后开始,每天都在这个时间执行任务
        executorService.scheduleAtFixedRate(runnable,10,60*60*24,TimeUnit.SECONDS);
        // 在指定延迟后,固定间隔的执行任务, 10s 后开始,执行完任务隔 1 小时后,再次执行任务
        executorService.scheduleWithFixedDelay(runnable,10,60*60*1, TimeUnit.SECONDS);
任务组

可以控制一组任务的执行

invokeAny : 任一任务得出结果,即返回结果

ExecutorCompletionService:可以先取出有结果的任务


        ExecutorService executorService = Executors.newFixedThreadPool(20);
        Callable<Integer> callable1=()->1;
        Callable<Integer> callable2=()->2;
        //Callable<Integer> callable2=()->{while (true);};

        List<Callable<Integer>> callables=new ArrayList();
        callables.add(callable1);
        callables.add(callable2);

        // 执行集合中的所有任务,且任意一个执行完成得出结果,即将结果返回,但是其他未执行完的线程仍会继续执行
        // 可以用在比如说多线程分段查找,任何一个线程找到了就可以返回结果了
        Integer result = executorService.invokeAny(callables);
        // 可以设置超时时间,超时未得出结果会抛出 TimeOutException
        executorService.invokeAny(callables,1,TimeUnit.SECONDS);


        // 2,1,1,2,2,1,1,1,1,1,  且每次程序运行结果不尽相同
        for (int i = 0; i < 10; i++) {System.out.print(executorService.invokeAny(callables)+",");}



        // 执行集合中所有任务,并返回 Future 的集合 ,此方法会等所有任务都有结果了才返回
        List<Future<Integer>> futures = executorService.invokeAll(callables);
        // 与 invokeAny 一样也有带超时参数的版本
        executorService.invokeAll(callables,1,TimeUnit.SECONDS);



        // 构建一个能收集完成服务执行器,内部会维护一个阻塞队列 BlockingQueue,包含已完成执行的任务的future
        ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
        for (Callable<Integer> callable : callables) {
            executorCompletionService.submit(callable);
        }
        System.out.println("******");
        // take 方法取出一个已经执行完的任务的 future ,如果没有完成的任务,则会阻塞(底层的 BlockingQueue.take() 阻塞了)
        executorCompletionService.take().get();
        executorCompletionService.take().get();
        // 只有两个任务结果已经全部取出,要是再取就阻塞了
        //executorCompletionService.take().get();

        // 取出一个完成的任务结果,如果没有 返回 null
        executorCompletionService.poll();
        // 没有结果的时候会等待给定时间 (就是BlockingQueue.poll)
        executorCompletionService.poll(2,TimeUnit.SECONDS);

        executorService.shutdown();
Fork-Join 框架

Jave SE 7 提供了 fork-join 框架,可以将任务分段并行执行

一个计算1-1000(不含1000)的和 ,用法如下,



public class MyForkJoin {

    public static void main(String[] args) {

        int start=1;
        int end=1000;
        // 创建线程池
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        // 创建任务,
        MyTask myTask = new MyTask(start, end);
        // 执行任务
        forkJoinPool.invoke(myTask);
        // 获得任务执行结果
        System.out.println(myTask.join());

        // 一般算法对比看计算结果是否准确
        System.out.println(sum(start,end));
    }


    static  int sum(int start,int end){
        int sum=0;
        for (int i = start; i < end; i++) {
            sum+=i;
        }
        return sum;
    }
}
// 如果需要返回计算结果 则继承 RecursiveTask<T>
// 如果不需要生成结果,则继承 RecursiveAction
class MyTask extends RecursiveTask<Integer> {
    // 设置分组每组长度的界限
    static int threshold = 10;
    // 分组起始
    int start;
    // 分组结束
    int end;

    public MyTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        // 小于每组长度就不分了 直接执行计算
        if (end - start <= threshold) {
            int sum = 0;
            for (int i = start; i < end; i++) {
                sum += i;
            }
            return sum;
        } else {
            // 继续分组
            int mid = (start + end) / 2;
            MyTask first = new MyTask(start, mid);
            MyTask second = new MyTask(mid, end);
            // 执行两个子任务 , 这明明是个任务类,那么线程池是从哪里来的呢, ForkJoinWorkerThread 内有一个 pool ,通过 Thread.currentThread() 拿到线程再拿到 pool
            invokeAll(first, second);
            // 因为这个任务的逻辑是求总和,将两个子任务求和,作为本任务的结果返回
            return first.join() + second.join();
        }
    }
}
CompletableFuture

可以在任务完成后,按顺序执行一些工作的 Future

并没有明白具体怎么用,见 执行器-可完成的Future


        // public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
        CompletableFuture completableFuture = new CompletableFuture();

        // 每个方法都返回 CompletableFuture ,在 Future 完成后执行后面的操作,且这些方法是非阻塞的
        completableFuture.thenApply((t)->1);
        completableFuture.thenCompose((t)->1);
        //  completableFuture.thenXxxxx 还有很多方法

        // 可以实现一个任务完成后按顺序执行一些工作
        completableFuture.thenApply((t)->1).thenCompose((t)->1).thenAccept((t)->{});
上一篇下一篇

猜你喜欢

热点阅读