Java7新特性8-fork/join框架

2017-04-11  本文已影响190人  不迷失

Fork/Join框架

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架,使得应用能充分利用线程进行并行计算,并减少了线程间的竞争。

所谓Fork就是把一个大任务切分为若干子任务并行的执行,Join则是合并这些子任务的执行结果,最后得到这个大任务的结果。

比如计算1+2+。。+1000,可以分割成10个子任务,每个子任务分别对100个数进行求和,最终汇总这10个子任务的结果。

Fork/Join的运行流程

2121

第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。分割的子任务分别放在一个双端队列里.

第二步执行任务并合并结果。然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个单独队列里,启动一个线程从队列里拿数据,然后合并这些数据。

实现

Fork/Join使用两个类来完成以上两件事情:

ForkJoinTask

我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。

通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

ForkJoinPool

ForkJoinTask需要通过ForkJoinPool来执行,它是一个特殊的ExecutorService.任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

示例

下面我们通过一个例子来看看怎么使用fork/join框架。

假如我们要计算从1到300的和,我们希望每个子任务最多不能计算超过100个数字,只要超出100个数字的计算,就进一步拆分,直到要计算的数字个数在100个内。

package java7;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
 * 基于fork/join计算两个数字之间的累加值
 * @author qiang.xie
 * @date 2017/4/10
 */
public class ForkJoin extends RecursiveTask<Integer>{

    final int step=100;//表示每个任务最多只计算100个数字的和,比如从1加到200就分层1+100和101+200两个任务

    private int from;//从哪里开始计算

    private int to;//到哪里结束计算


    public ForkJoin(int from,int to){
        this.from=from;
        this.to=to;
    }

    //重写此方法,用于计算任务结果    
    @Override
    protected Integer compute() {
        if((to-from)<step){
            //小于100个数,直接计算
            return sum(from,to);
        }
        //拆分任务,一分为二
        int middle=(from+to)/2;
        ForkJoin task1=new ForkJoin(from,middle);
        ForkJoin task2=new ForkJoin(middle+1,to);

        //执行子任务(异步)
        task1.fork();
        task2.fork();

        //等待子任务结果
        int t1=task1.join();
        int t2=task2.join();
        return t1+t2;

    }

    private int sum(int from,int to){
        System.out.println("from:"+from+",to:"+to);
        int sum=0;
        for(int i=from;i<=to;i++){
            sum+=i;
        }
        return sum;
    }

    public static void main(String[] arg) throws Exception {
        //fork/join需要ForkJoinPoll执行
        ForkJoinPool pool=new ForkJoinPool();
        System.out.println(pool.submit(new ForkJoin(1,300)).get());
    }
}

运行结果:

from:1,to:75
from:151,to:225
from:76,to:150
from:226,to:300
45150

@不迷失|知识改善生活

weixinweixin
微信公众号:java技术

专注技术研究与视频教学,分享有价值的技术与经验,关注程序员的发展!

--
技术博客:http://bumishi.cn

技术交流群:245130488

@不迷失教学视频

QQ课堂:http://bumishi.ke.qq.com

百度传课:http://chuanke.com/s3377987.html

上一篇下一篇

猜你喜欢

热点阅读