Java

java多线程-ForkJoinPool

2019-03-03  本文已影响126人  探索者_逗你玩儿

ForkJoinPool 是jdk1.7引入的线程池,不同于其他的线程池,过去我们在线程池解决问题时,通常维护了一个阻塞的任务队列。每个工作线程在任务完成后,就会去任务队列里面寻找任务。这种方式在我们执行数量较多且不互相依赖的任务时非常方便且高效。但是当我们需要执行一个很大的任务时,普通的线程池似乎就很难有什么帮助了。ForkJoinPool核心思想就是把大的任务分拆成一个个的小任务执行,然后再将执行结果进行聚合,这种思想很类似于MapReduce,它提供基本的线程池功能,支持设置最大并发线程数,支持任务排队,支持线程池停止,支持线程池使用情况监控,也是AbstractExecutorService的子类,主要引入了“工作窃取”机制,在多CPU计算机上处理性能更佳。所谓Work-Steeling机制就是指ForkJoinPool已经分配了与线程数相等的队列,当有任务加入线程池时,会被平均分配到对应的队列上,各线程进行正常工作,当有线程提前完成时,会从队列的末端“窃取”其他线程未执行完的任务,当任务量特别大时,CPU多的计算机会表现出更好的性能。
在ForkJoinPool中比较重要的两个组件一是ForkJoinPool即线程池,二是ForkJoinTask 分拆的子任务。
ForkJoinPool是多个线程队列容器,用于存放分拆的子任务,同时兼顾来线程池的特性,可以对线程进行监控。
ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。
接下来我们通过代码来看看

public class ForkJoinPoolDemo {

    static List<Integer> list = Lists.newCopyOnWriteArrayList();

    static class MyRecursiveAction extends RecursiveAction{

        private final int LIMIT = 20;

        private int start;

        private int end;



        public MyRecursiveAction(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            int sum = 0;
            if((end - start)<= LIMIT){
                for (int i = start; i <= end; i++)
                    sum += i;
            }else{
                int total = (this.end + this.start)/2;
                MyRecursiveAction leftTask = new MyRecursiveAction(start,total);
                MyRecursiveAction rightTask = new MyRecursiveAction(total+1,end);
                invokeAll(leftTask,rightTask);
//                leftTask.fork();
//                rightTask.fork();
            }
            System.out.println("list = " + sum);
        }
    }

    static class MyRecursiveTask extends RecursiveTask<Integer>{

        private final int LIMIT = 20;

        private int start;

        private int end;

        public MyRecursiveTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            int sum = 0;
            if((end - start) < LIMIT){
                System.out.println(start+ " ===="+ end);
                for (int i = start; i <= end; i++){
                    sum += i;
                }
                return sum;
            }else {
                System.out.println("MyRecursiveTask.开始分解");
                int total = (this.end + this.start) / 2;
                MyRecursiveTask leftTask = new MyRecursiveTask(start, total);
                MyRecursiveTask rightTask = new MyRecursiveTask(total+1, end);
//                invokeAll(leftTask,rightTask);
                leftTask.fork();
                rightTask.fork();
                return leftTask.join() + rightTask.join();
            }

        }
    }


    public static void main(String[] args){
        ForkJoinPool pool = new ForkJoinPool();
        MyRecursiveAction task = new MyRecursiveAction(0,100);
        MyRecursiveTask task1 = new MyRecursiveTask(0,100);
        pool.invoke(task);
//        pool.submit(task1);
        Integer a = pool.invoke(task1);
        System.out.println("addddd = " + a);
    }




}
上一篇下一篇

猜你喜欢

热点阅读