Java8 Stream 并行流

2020-09-27  本文已影响0人  垃圾简书_吃枣药丸

并行流就是把一系列数据自动拆分成多个数据块,并使用多个线程来处理这些数据块,这样就可以利用现代CPU多核的优势,把计算任务分配给多个CPU核心,最后再汇总结果。让它们都忙起来~

# 并行流使用的线程池

public static void main(String[] args) {
    Random random = new Random();

    //非并行流
    Stream.generate(() -> random.nextInt(1000))
            .limit(10)
            .forEach(x -> System.out.println(x + ": " + Thread.currentThread().getName()));

    //并行流
    Stream.generate(() -> random.nextInt(1000))
            .limit(1000)
            //转换成并行流
            .parallel()
            .forEach(x -> System.out.println(x + ": " + Thread.currentThread().getName()));
}
@Getter
@Setter
@Slf4j
public class ForkJoinCalculator extends RecursiveTask<Long> {

    /**
     * 获取通用的ForkJoinPool
     */
    private static final ForkJoinPool FORK_JOIN_POOL = ForkJoinPool.commonPool();
    /**
     * 最小批次元素数量
     */
    private static final int MIN_BATCH_SIZE = 1000;

    private long[] dataArray;
    private int startIndex;
    private int endIndex;

    public ForkJoinCalculator(long[] dataArray) {
        this.dataArray = dataArray;
        this.startIndex = 0;
        this.endIndex = dataArray.length;
    }

    private ForkJoinCalculator(long[] dataArray, int startIndex, int endIndex) {
        this.dataArray = dataArray;
        this.startIndex = startIndex;
        this.endIndex = endIndex;
    }

    @Override
    protected Long compute() {
        ForkJoinCalculator.printWithThread("cur startIndex=%s,endIndex=%s", startIndex, endIndex);
        long curTotal = 0L;
        if (endIndex - startIndex <= MIN_BATCH_SIZE) {
            //如果需要计算的元素个数小于最小阈值则直接计算
            for (int i = startIndex; i < endIndex; i++) {
                curTotal += dataArray[i];
            }
            ForkJoinCalculator.printWithThread("直接计算curTotal=%s", curTotal);
        } else {
            // 如要要计算的元素个数大于设定的最小阈值,则进行任务拆分
            // 将元素startIndex~endIndex个任务拆分成两份

            // 计算中间索引
            int middleIndex = (startIndex + endIndex) / 2;
            ForkJoinCalculator leftForkJoinCalculator = new ForkJoinCalculator(dataArray, startIndex, middleIndex);
            ForkJoinCalculator rightForkJoinCalculator = new ForkJoinCalculator(dataArray, middleIndex, endIndex);
            // fork():将任务push到线程的工作队列
            // join(): 计算结果
            long leftTotal = leftForkJoinCalculator.fork().join();
            // 第二个子任务,有可能继续划分
            long rightTotal = rightForkJoinCalculator.compute();
            ForkJoinCalculator.printWithThread("leftTotal=%s,rightTotal=%s", leftTotal, rightTotal);
            curTotal = leftTotal + rightTotal;
        }
        return curTotal;
    }

    public static void printWithThread(String format, Object... args) {
        String formatStr = String.format(format, args);
        String threadName = Thread.currentThread().getName();
        System.out.println(threadName + ": " + formatStr);
    }
}

# 性能与注意点

使用并行流并不保证性能一定比非并行流和for循环好,有时候可能更差,这取决于要处理的数据集的数据结构。并且,在使用并行流之前,必须确保用的对,否则可能出现计算结果错误的严重后果。请记住,并行化并不是没有代价的。

public static class Add {
    private long total;

    public void add(long curVal) {
        total += curVal;
    }
}

public void streamDemo() {
    Add add = new Add();
    LongStream.rangeClosed(1, 1_000_000)
            .forEach(add::add);
    System.out.println(add.total);
}

public void parallelStreamDemo() {
    Add add = new Add();
    LongStream.rangeClosed(1, 1_000_000)
            .parallel()
            .forEach(add::add);
    System.out.println(add.total);
}
500000500000
53692171876

# 高效使用并行流的建议

数据源 可拆分性
ArrayList 极佳
LinkedList
Stream.range() 极佳
Stream.iterate()
HashSet
TreeSet
上一篇 下一篇

猜你喜欢

热点阅读