12.Fork_Join框架

2020-03-03  本文已影响0人  强某某
  1. 用来做什么
    ForkJoinPool是ExecutorService(线程池服务)接口的实现,它专为可以递归分解成小块的工作而设计。
    for/join框架将任务分配给线程池中的工作线程,充分利用多处理器的优势,提高程序性能。
    使用fork/join框架的第一步是编写执行一部分工作的代码。类似的伪代码如下:
如果(当前工作部分足够小)
    直接做这项工作
其他
    把当前工作分成两部分
    调用这两个部分并等待结果

将此代码包装在ForkJoinTask子类中,通常RecursiveTask(可以返回结果)或者RecursiveAction(不可以返回结果)

ForkJoinTask是RecursiveAction与RecursiveTask的父类, ForkJoinTask中使用了模板模式进行设计
,将ForkJoinTask的执行相关的代码进行隐藏,通过提供抽象类暴露用户的实际业务处理。

  1. 意图梳理
    关键点:分解任务fork出新任务,汇集join任务执行结果


    1.png

ForkJoin是由JDK1.7后提供多线并发处理框架。ForkJoin的框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总。相应的ForkJoin将复杂的计算当做一个任务。而分解的多个计算则是当做一个子任务。

  1. 工作窃取


    2.png

说明:所谓工作窃取区别于传统线程池,是因为,虽然也是多线程工作,但是线程池是自己线程干自己的事情,干完了就休息,但是ForkJoin的工作窃取是当自己线程任务队列为空之后,则取其他任务队列取任务帮助完成,所以更加充分的利用CPU,性能更高。

  1. 实现思路
  1. 适用
  1. 基本使用
    使用ForkJoin框架,需要创建一个ForkJoin的任务,而ForkJoinTask是一个抽象类,我们不需要去继承ForkJoinTask进行使用。因为ForkJoin框架为我们提供了RecursiveAction和RecursiveTask。我们只需要继承ForkJoin为我们提供的抽象类的其中一个并且实现compute方法。
private static class SumTask extends RecursiveTask<Integer> {

        private  int threshold ;
        private static final int segmentation = 10;

        private int[] src;

        private int fromIndex;
        private int toIndex;

        public SumTask(int formIndex,int toIndex,int[] src){
            this.fromIndex = formIndex;
            this.toIndex = toIndex;
            this.src = src;
            this.threshold = src.length/segmentation;
        }

        @Override
        protected Integer compute() {
            //核心就是该方法

            //可知forkjoin只提供具体抽象,但是实际任务怎么拆分还是
            //看具体用户怎么书写此处的代码

            //此处大意就是if内部的就是具体执行,else则是继续拆分任务,不做具体执行,实际拆分之后最终都会进入if内部
            if((toIndex - fromIndex)<threshold ){
                int count = 0;
                System.out.println(" from index = "+fromIndex
                        +" toIndex="+toIndex);
                for(int i = fromIndex;i<=toIndex;i++){
                  count+=src[i];
                }
                return count;
            }else{
                int mid = (fromIndex+toIndex)/2;
                SumTask left =  new SumTask(fromIndex,mid,src);
                SumTask right = new SumTask(mid+1,toIndex,src);
                invokeAll(left,right);
                return left.join()+right.join();
            }
        }
    }

使用ForkJoinPool进行执行
task要通过ForkJoinPool来执行,分割的子任务也会添加到当前工作线程的双端队列中,
进入队列的头部。当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务(工作窃取)。

 public static void main(String[] args) {
            int[]  array = MakeArray.createIntArray();
        ForkJoinPool forkJoinPool= new ForkJoinPool();
        SumTask sumTask  = new SumTask(0,array.length-1,array);

        long start = System.currentTimeMillis();

        forkJoinPool.invoke(sumTask);
        System.out.println("The count is "+sumTask.join()
                +" spend time:"+(System.currentTimeMillis()-start)+"ms");

    }
  1. Future
    Future表示异步计算的结果,提供了用于检查计算是否完成、等待计算完成以及获取结果的方法

Future的类图结构


3.png

如上图可知,ForkJoin框架以及Future都是属于Future的子类或者抽象类继承类

public class CallDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService service= Executors.newCachedThreadPool();

        Callable<String> callable=new Callable<String>() {
            @Override
            public String call() throws Exception {
                //实际是运行在runable的run方法里面,所以也是多线程
                return null;
            }
        };
        //方式一:通过线程池-其实还是多线程运行,会发现内部就是FutureTask
//        service.submit(callable);

        //方式二:通过FutureTask包装callable
        FutureTask<String> task=new FutureTask<String>(callable);
        new Thread(task).start();
       String a= task.get();//通过这种方式获取callable内部call的返回值
//        System.out.println(a);null

    }
}

之前如果想获取异步结果返回值,则需要手动通过countDownLatch实现,但是有了FutureTask则可以很方便的获取

  1. FutureTask应用

如上面的例子,FutureTask是为了更加简单的获取异步返回信息,并提供多线程或者线程池的操作,从而进行多个异步操作同时进行,而结果只取最长的任务那个,同时提供get获取返回值


4.png

总的执行时间,取决于执行最慢的逻辑
逻辑之间无依赖关系,可同时执行,则可以应用多线程技术进行优化

  1. 自定义实现简单版本的FutureTask
public class TonyFutureTask<T> implements Runnable, Future {
    Callable<T> callable;//业务逻辑在callable里面
    T result=null;
    volatile String state="NEW";//task执行状态
    LinkedBlockingQueue<Thread> waiters=new LinkedBlockingQueue();

    public TonyFutureTask(Callable<T> callable) {
        this.callable=callable;
    }
    @Override
    public void run() {
        try {
            //从这个里可知,call确实是在run方法内部执行的
            result=callable.call();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            state="END";
        }
        //唤醒等待者
        Thread waiter=waiters.poll();
        while (waiter != null) {
            LockSupport.unpark(waiter);
            //继续取出队列中的等待者
            waiter=waiters.poll();
        }
    }
    @Override
    public T get() {
        if ("END".equals(state)) {
            return result;
        }
        waiters.offer(Thread.currentThread());//加入到等待队列,线程不继续往下执行了
        while (!"END".equals(state)) {
            //阻塞
            LockSupport.park();
        }
        return result;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }
    @Override
    public boolean isCancelled() {
        return false;
    }
    @Override
    public boolean isDone() {
        return false;
    }
    @Override
    public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return null;
    }
}

上面使用FutureTask的例子都可以使用这个自定义的task从而达到相同目的,实际上jdk内部的FutureTask也是这么实现的,只不过更具体更细节

  1. 线程安全级别
上一篇下一篇

猜你喜欢

热点阅读