并行数据处理与性能

2019-02-07  本文已影响0人  PawsUp


这时,若n特别大的时候,就可以做并行处理,将顺序流转换为并行流:对顺序流调用parallel即可:在处理时,会把Stream在内部分成几块,在不同的块独立并行归纳操作,最后,同一个归纳操作会将各个子流的部分归纳结果合并起来,得到整个原始流的归纳结果。

  public static long parallelSum(long n) {
       return Stream.iterate(1L, i -> i + 1)
                    .limit(n)
                    .parallel()//将流转换为并行流
                    .reduce(0L, Long::sum);
  } 
image.png

但是,在实际操作时,对顺序流调用parallel方法并不意味着对流有任何实际的变化。它内部实际上就是设了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并行执行。对并行流调用sequential方法就可以把它变成顺序流。将两个方法结合起来,就可以更细化地控制在遍历流时哪些操作要并行执行,哪些要顺序执行。例如:

  stream.parallel()
        .filter(...)
        .sequential()
        .map(...)
        .parallel()//最后一次parallel或sequential调用会影响整个流水线
        .reduce(); 

并行流默认使用的线程使用了默认的ForkJoinPool,它默认的线程数量就是你的处理器数量。


并行化是需要付出代价的,并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。


根据可分解性总结的一些流数据源适不适合并行

可分解性
ArrayList 极佳
LinkedList
IntStream.range 极差
Stream.iterate
HashSet
TreeSet

分支/合并框架

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。

  1. 使用RecursiveTask
    把任务提交到这个池,必须创建RecursiveTask<R>的一个子类,其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型,则需要实现它的抽象方法compute:
    protected abstract R compute();
    这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。

     if (任务足够小或不可分) {
         顺序计算该任务
     } else {
          将任务分成两个子任务
          递归调用本方法,拆分每个子任务,等待所有子任务完成
          合并每个子任务的结果
     } 
    

例如:利用分支/合并框架执行并行求和

public class ForkJoinSumCalculator
             extends java.util.concurrent.RecursiveTask<Long> {

     private final long[] numbers;
     private final int start;
     private final int end;

     public static final long THRESHOLD = 10_000;//不再将任务分成子任务的大小

     public ForkJoinSumCalculator(long[] numbers) {
           this(numbers, 0, numbers.length);
     }

     private ForkJoinSumCalculator(long[] numbers, int start, int end) {
           this.numbers = numbers;
           this.start = start;
           this.end = end;
     }

   @Override
   protected Long compute() {
         int length = end - start;
         if (length <= THRESHOLD) {//如果大小小于或等于阈值,顺序计算结果
               return computeSequentially();
         }
         ForkJoinSumCalculator leftTask =
                   new ForkJoinSumCalculator(numbers, start, start + length/2);
         leftTask.fork();//利用另一个ForkJoinPool线程异步执行新创建的子任务
         ForkJoinSumCalculator rightTask =
                   new ForkJoinSumCalculator(numbers, start + length/2, end);//创建一个任务为数组的后一半数组求和
         Long rightResult = rightTask.compute();//同步执行第二个子任务,有可能允许进一步递归划分
         Long leftResult = leftTask.join();//读取第一个子任务的结果,如果尚未完成就等待
         return leftResult + rightResult;
   }

    //在子任务不再可分时计算结果的简单算法
   private long computeSequentially() {
         long sum = 0;
         for (int i = start; i < end; i++) {{
             sum += numbers[i];
         }
       return sum;
   }
} 

把数字数组传给ForkJoinSumCalculator的构造函数,即可实现并行对前n个自然数求和:

public static long forkJoinSum(long n) {
     long[] numbers = LongStream.rangeClosed(1, n).toArray();
     //ForkJoinTask为RecursiveTask的父类
     ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
     return new ForkJoinPool().invoke(task);//invoke用来执行某个对象的目标方法
} 
image.png

//7.3.2自定义Spliterator

上一篇下一篇

猜你喜欢

热点阅读