右耳菌-邓小白的Java架构师的修炼之路

Java并发 - fork/join并发处理框架

2022-06-22  本文已影响0人  右耳菌

思想:分而治之

用来做什么

ForkJoinPool是ExecutorService接口的实现,它专为可以递归分解成小块的工作而设计。fork / join框架将任务分配给线程池中的工作线程,充分利用多处理器的优势,提高程序性能。使用fork / join框架的第一步是编写执行一部分工作的代码。类似的伪代码如下:

如果(当前工作部分足够小)
直接做这项工作
其他
把当前工作分成两部分调用这两个部分并等待结果

将此代码包装在ForkJoinTask子类中,通常是RecursiveTask (可以返回结果)或RecursiveAction.

先来看一个例子,下边的例子是模拟读取一个大文件的过程,使用了java多线程中的Callable方式,每个线程最多读取10个,这样的多线程一起读取的方式来提升效率的。

package forkjoin;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class ExecuteTask_Demo {

     /*
    思考:现在有很多网络地址存在ArrayList中,我需要做网络请求,
    为了并发执行,我就需要将这个列表进行拆分
     */

    static ArrayList<String> urls = new ArrayList<String>() { //假设当前是一个从外部读取的资源文件
        {
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
            add("http://www.baidu.com");
            add("http://www.sina.com");
        }
    };


    public static String doRequest(String url) {
        //模拟网络请求
        return "Kane ... read ... " + url + "\n";
    }


    static class Task implements Callable<String> {

        int start;
        int end;

        public Task(int start, int end) {
            this.start = start;
            this.end = end;
        }


        @Override
        public String call() throws Exception {
            String result = "";
            for (int i = start; i < end; i++) {
                result += doRequest(urls.get(i));
            }
            return result;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService pool = Executors.newFixedThreadPool(4);

        List<Future> futures = new ArrayList<>();

        int size = urls.size();

        int groupSize = 10;

        int groupCount = (size - 1) / groupSize + 1;

        for (int groupIndex = 0; groupIndex < groupCount - 1; groupIndex++) {
            int leftIndex = groupSize * groupIndex;
            int rightIndex = groupSize * (groupIndex + 1);

            Future<String> future = pool.submit(new Task(leftIndex, rightIndex));
            futures.add(future);
        }
        for (Future future : futures) {
            System.out.println(future.get());
        }
        
    }
}

而ForkJoinPool其实本质上来说也是类似的的方式,但是它是使用了二分拆分的方式(类似二分查找),对任务进行拆分,然后拆分的任务结束后再合并起来,一起返回。

forkjoinpool 的例子如下

package forkjoin;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ForkJoinTest {

    /*
    思考:现在有很多网络地址存在ArrayList中,我需要做网络请求,
    为了并发执行,我就需要将这个列表进行拆分
    */

    static ArrayList<String> urls = new ArrayList<String>() { //假设当前是一个从外部读取的资源文件
        // 忽略代码
    }


    public static String doRequest(String url) {
        //模拟网络请求
        return "Kane ... read ... " + url + "\n";
    }

    static class Job extends RecursiveTask<String> {

        List<String> urls;
        int start;
        int end;

        public Job(List<String> urls, int start, int end) {
            this.urls = urls;
            this.start = start;
            this.end = end;
        }

        @Override
        protected String compute() { //定义任务拆分的规则,是forkjoinpool 的 核心内容
            int count = end - start;

            if (count <= 10) {
                //直接执行
                String rs = "";
                for (int i = start; i < end; i++) {
                    String response = doRequest(urls.get(i));
                    rs += response;
                }
                return rs;
            } else {
                //拆分
                int x = (start + end) / 2;
                Job job1 = new Job(urls, start, x);
                job1.fork();
                Job job2 = new Job(urls, x, end);
                job2.fork();

                String rs = "";
                rs += job1.join();
                rs += job2.join();
                return rs;
            }

        }
    }

    static ForkJoinPool forkJoinPool = new ForkJoinPool(3, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Job job = new Job(urls, 0, urls.size());
        ForkJoinTask<String> forkJoinTask = forkJoinPool.submit(job);

        String result = forkJoinTask.get();
        System.out.println(result);

    }
}


意图梳理

实现思路

适用

适合数据处理、结果汇总、统计等场景;
java8实例:java.util.Arrays类用于其parallelSort()方法

结语:工作窃取带来的性能提升偏理论,API的复杂性较高,实际研发中可控性来说不如其他API。


如果觉得有收获就点个赞吧,更多知识,请点击关注查看我的主页信息哦~

上一篇 下一篇

猜你喜欢

热点阅读