程序员小天地技术干货Java 杂谈

并行执行任务的Fork/Join框架

2017-06-22  本文已影响0人  小草莓子桑

背书中(引用书上的话):Java7中提供了用于并行执行任务的Fork/Join框架, 可以把任务分成若干个分任务,最终汇总每个分任务的结果得到总任务的结果。这篇我们来看看Fork/Join框架。

先举个栗子

一个字符串数组,需要把每个元素中的*字符的索引返回,并求和(自己编了个栗子,没有撒实际意义),用Fork/Join框架来实现,可以定义一个处理字符串数组的总任务,然后把总任务拆分,把数组中每个字符串交给子任务去处理,然后等待子任务执行完毕,汇总结果,并返回:

package thread.ForkJoin;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * @Description: .
 * @Author: ZhaoWeiNan .
 * @CreatedTime: 2017/6/21 .
 * @Version: 1.0 .
 */
public class StringTask extends RecursiveTask<Integer>{
    //要处理的字符串
    private String dest;

    public StringTask(String dest) {
        this.dest = dest;
    }
    //父类RecursiveTask是一个抽象类,所以需要实现compute方法
    @Override
    protected Integer compute() {
        if (dest == null || "".equals(dest))
            return 0;
        //判断字符串中 * 的索引,并返回
        return dest.indexOf("*");
    }
}

class ArrayTask extends RecursiveTask<Integer>{
    //需要处理的字符串数组
    private String[] array;

    public ArrayTask(String[] array) {
        this.array = array;
    }

    @Override
    protected Integer compute() {
        if (array == null || array.length < 1)
            return 0;

        //申明一个StringTask变量,作为子任务
        StringTask stringTask;
        //定义一个子任务队列,用于任务执行完毕后,获取子任务的执行结果
        List<StringTask> list = new ArrayList<>();
        int sum = 0;
        //把字符串数组的中每一个字符串分给多个StringTask子任务去处理
        for (String s : array){
            //创建一个变量,作为子任务去处理字符串
            stringTask = new StringTask(s);
            //执行子任务
            stringTask.fork();
            //加入子任务队列
            list.add(stringTask);
        }

        for (StringTask task : list){
            //等子任务执行完毕,获取子任务执行的结果,并累加
            sum += task.join();
        }

        return sum;
    }
}

class Demo{

    public static void main(String[] args){
        //初始化字符串数组
        String[] array = new String[]{"#####*####","##*########","###*#######","#*############"};
        //创建一个总任务,处理字符串数组
        ArrayTask arrayTask = new ArrayTask(array);
        //创建执行任务的线程池ForkJoinPool对象
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //执行总任务
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(arrayTask);

        //返回任务的结果
        try {
            System.out.println(forkJoinTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

代码放到了开源中国:http://git.oschina.net/zhaoweinan/forkjoin,有兴趣的小伙伴可以拿去
总的来说,Fork/Join框架就是一个用来并行执行任务的框架,可以把一个大任务,分成若干个子任务,等各个子任务执行完毕,可以把他们的执行结果获取到,并汇聚,起到了并行执行任务作用。

Fork/Join框架的构成

1.ForkJoinPool

ForkJoinPool类图

ForkJoinPool继承了AbstractExecutorService抽象类,AbstractExecutorService实现了ExecutorService接口,由此看来ForkJoinPool也是线程池家族的一员,


过滤了下方法,只显示了公共方法,并截取了一下

ForkJoinPool使用invoke、execute、submit用来执行任务。

2.ForkJoinTask

ForkJoinTask类图

ForkJoinTask是Fork/Join框架使用的任务类,实现了Future接口,我们一般使用它的两个子类RecursiveTask和RecursiveAction,


RecursiveAction类图
public abstract class RecursiveAction extends ForkJoinTask<Void> {
    private static final long serialVersionUID = 5232453952276485070L;

RecursiveAction适用于没有返回结果的任务,

RecursiveTask类图
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;

RecursiveTask适用于有返回值的任务。

Work-Stealing (工作窃取)

粗略说一下Work-Stealing,ForkJoinPool具有 Work-Stealing (工作窃取)的能力,什么意思呢?就拿文章开头的栗子来说,把处理字符串数组的大任务,分成了若干个处理字符串的子任务,这些子任务线程执行完毕后,不会闲着,回去执行别的子任务,通俗的来说,Work-Stealing (工作窃取)就是线程从其他队列里面获取任务来执行。

Work-Stealing的优点

充分利用了线程,提高了线程并行执行任务的效率,并减少了线程间竞争带来的系统开销。

Work-Stealing的缺点

存在竞争的情况,而且占用了更多的系统资源。

Fork/Join框架原理

ForkJoinPool分析

贴一张ForkJoinPool的类图


大小不好调整,就截取一般吧

注意箭头所指的两个属性,
ForkJoinTask<?>数组submissionQueue,存放程序加到ForkJoinPool的任务

    private ForkJoinTask<?>[] submissionQueue;

ForkJoinWorkerThread类继承了Thread,是一个线程类, ForkJoinWorkerThread[] workers就是一个线程数组,负责去执行submissionQueue中的任务

    ForkJoinWorkerThread[] workers;

    .....
    public class ForkJoinWorkerThread extends Thread

ForkJoinTask分析

fork方法

获取当前ForkJoinWorkerThread线程,调用ForkJoinWorkerThread的pushTask方法执行ForkJoinTask任务

   public final ForkJoinTask<V> fork() {
        //获取当前ForkJoinWorkerThread线程,调用ForkJoinWorkerThread的pushTask方法执行任务
        ((ForkJoinWorkerThread) Thread.currentThread())
                .pushTask(this);
        return this;
    }

再来看看ForkJoinWorkerThread的pushTask方法:

final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                //调用线程池ForkJoinPool的signalWork方法
                pool.signalWork();
            else if (s == m)
                growQueue();
        }
    }

ForkJoinWorkerThread的pushTask方法把任务ForkJoinTask加到了ForkJoinTask[]任务数组中,并调用了ForkJoinPool线程池的signalWork方法唤醒线程或者创建一个线程去执行任务,粗略的贴一下signalWork的关键代码:

private void addWorker() {
        Throwable ex = null;
        ForkJoinWorkerThread t = null;
        try {
            t = factory.newThread(this);
        } catch (Throwable e) {
            ex = e;
        }
        if (t == null) {  // null or exceptional factory return
            long c;       // adjust counts
            do {} while (!UNSAFE.compareAndSwapLong
                         (this, ctlOffset, c = ctl,
                          (((c - AC_UNIT) & AC_MASK) |
                           ((c - TC_UNIT) & TC_MASK) |
                           (c & ~(AC_MASK|TC_MASK)))));
            // Propagate exception if originating from an external caller
            if (!tryTerminate(false) && ex != null &&
                !(Thread.currentThread() instanceof ForkJoinWorkerThread))
                UNSAFE.throwException(ex);
        }
        else
            t.start();
    }

最终调用到了这里,来执行任务。

join方法

从文章开头的栗子来看,join方法会阻塞当前线程,等待获取任务执行的结果

    //百度了这四种状态的含义
    private static final int NORMAL      = -1;   //NORMAL已完成
    private static final int CANCELLED   = -2;  //CANCELLED已取消
    private static final int EXCEPTIONAL = -3;  //EXCEPTIONAL出现异常
    private static final int SIGNAL      =  1;  //SIGNAL信号

     public final V join() {
        //先调用doJoin方法判断上面定义的四个状态
        if (doJoin() != NORMAL)
            return reportResult();
        else
            return getRawResult();
    }

join方法先调用doJoin方法判断任务的状态,看看doJoin方法,

   private int doJoin() {
        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
        //获取当前ForkJoinWorkerThread线程
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            //如果状态是小于0 也就是 -1,-2,-3 分别代表已完成、已取消、出现异常
            //直接返回状态
            if ((s = status) < 0)
                return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                //如果状态为1|SIGNAL|信号
                //执行任务
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    //出现异常,把状态改为-3|EXCEPTIONAL|出现异常,返回
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    //执行成功,把状态改为-1|NORMAL|已完成,返回
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }

doJoin查看任务的状态,如果状态是-1|NORMAL|已完成,-2|CANCELLED|已取消,-3|EXCEPTIONAL|出现异常,证明任务已经执行完毕,返回状态位,如果状态是 1|SIGNAL|信号,则去执行任务,如果执行成功返回-1|NORMAL|已完成,出现异常返回-3|EXCEPTIONAL|出现异常。
再来看看返回结果的reportResult方法和getRawResult方法:

private V reportResult() {
        int s; Throwable ex;
        //如果状态为-2|CANCELLED|已取消,抛出一个CancellationException异常
        if ((s = status) == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
            UNSAFE.throwException(ex);
        //调用getRawResult方法返回结果
        return getRawResult();
    }

reportResult方法,先会判断状态,如果状态为-2|CANCELLED|已取消,则抛出一个CancellationException异常,否则调用getRawResult方法返回结果:

    public abstract V getRawResult();

getRawResult方法在ForkJoinTask类是抽象方法,具体实现在他的两子类中。
RecursiveAction子类:

    public final Void getRawResult() { return null; }

所以说RecursiveAction子类使用于没有返回值的任务。
RecursiveTask子类:

public final V getRawResult() {
        return result;
    }

RecursiveTask子类适用于有返回值的任务。

并行执行任务的Fork/Join框架是说完了。
欢迎大家来交流,指出文中一些说错的地方,让我加深认识。
谢谢大家!

上一篇 下一篇

猜你喜欢

热点阅读