高并发编程/并行任务组件ForkJoinPool设计图分解(高手

2024-11-02  本文已影响0人  肖哥弹架构
image.png

ForkJoinTask 是 Java 并发编程中的强大工具,专为大规模并行计算设计。它通过将大型任务分解成小块(fork),并在多个处理器上并行执行这些小块,然后将结果合并(join),实现了高效的并行处理。这种分治策略不仅简化了并行编程,还充分利用了多核处理器的能力,特别适用于计算密集型任务。如果你是并发编程的爱好者或需要处理复杂计算任务的开发者,ForkJoinTask 提供了一种优雅且高效的解决方案。

历史热点文章

0、ForkJoinPool 业务执行流程

f7c7b706c070a72e3bff1cc7f9ebca4d_2b7ddba198484bd28382d5a34c8f559d.png

图说明:

  1. 提交任务到 ForkJoinPool:将一个 ForkJoinTask 任务提交到 ForkJoinPool 中执行。
  2. 任务分解(Fork) :任务被分解成多个子任务,这是 ForkJoinTask 设计的核心,允许任务递归地分解成更小的子任务。
  3. 子任务1、子任务2、更多子任务... :表示分解出来的多个子任务。
  4. 子任务执行:每个子任务被执行。
  5. 检查是否还有子任务:在子任务执行完成后,检查是否还有更多的子任务需要分解和执行。
  6. 任务合并(Join) :所有子任务完成后,父任务会合并所有子任务的结果。
  7. 返回结果:合并后的结果被返回。
  8. 结束:流程的终点。

1、ForkJoinTask 设计目的

ForkJoinTask 是 Java 7 引入的一个并行计算框架的核心组件,特别适用于可以被自然地分解为子任务的工作负载,这些子任务可以独立执行并最终合并结果。以下是 ForkJoinTask 的设计因素:

  1. 分治算法的并行化ForkJoinTask 的设计初衷是为了支持分治算法(Divide and Conquer)的并行计算。这种算法将大问题分解成小问题,递归解决小问题,然后将结果合并以解决原始问题。
  2. 利用多核处理器:随着多核处理器的普及,ForkJoinTask 提供了一种高效的方式来利用多核处理器的并行能力,通过将任务分解并行执行,从而提高程序的执行效率。
  3. 工作窃取算法ForkJoinPool 实现了工作窃取算法,允许空闲线程从其他线程的任务队列中“窃取”任务来执行,减少线程空闲时间,提高资源利用率。
  4. 递归分解任务ForkJoinTask 允许任务在执行过程中动态地将自身分解成更小的子任务,这些子任务可以并行执行,最终合并结果。
  5. 线程池友好ForkJoinTask 专为 ForkJoinPool 设计,与标准的 ThreadPoolExecutor 相比,它提供了更细粒度的任务管理,适合于可以并行执行的任务。
  6. 任务合并ForkJoinTask 的子类通常包含一个 join 方法,用于获取任务的结果。当一个任务被分解成多个子任务时,可以通过调用父任务的 join 方法来等待所有子任务完成并合并结果。
  7. 灵活性和扩展性ForkJoinTask 提供了 forkjoin 方法,允许开发者手动控制任务的分解和结果的合并,提供了极高的灵活性和扩展性。
  8. 内置任务:Java 提供了一些内置的 ForkJoinTask 实现,如 RecursiveActionRecursiveTaskRecursiveAction 用于没有返回结果的任务,而 RecursiveTask 用于有返回结果的任务。

2、ForkJoinTask 相关组件概述

  1. ForkJoinPool
    • ForkJoinPoolForkJoinTask 执行的核心,它实现了 ExecutorService 接口,并通过工作窃取算法来平衡线程间的工作负载。每个工作线程都有一个自己的任务队列,当一个线程完成任务后,它会尝试从其他线程的任务队列中“窃取”任务来执行。这种机制有助于平衡负载,使得所有线程都能保持忙碌状态,从而提高 CPU 的利用率。
  2. ForkJoinTask
    • ForkJoinTask 是一个抽象类,代表在 ForkJoinPool 中执行的轻量级任务。它有两个重要的子类:RecursiveActionRecursiveTaskRecursiveAction 用于没有返回结果的任务,而 RecursiveTask 用于有返回结果的任务。这两个子类都需要实现 compute() 方法来定义任务的逻辑。
  3. 工作窃取算法
    • 工作窃取算法允许空闲线程从其他线程的任务队列中“窃取”任务来执行。默认情况下,工作线程从自己的任务队列头部获取任务。当队列为空时,线程会从其他忙碌线程的队列尾部“窃取”任务,或者从全局入口队列中获取任务,因为这些地方最有可能存在较大的工作量。
  4. 任务的分解与合并
    • ForkJoinTask 的实现中,任务通常被递归地分解成更小的子任务(Fork),直到它们足够小,可以直接异步执行。然后,这些子任务的结果被递归地合并(Join)成一个单一的结果。
  5. ForkJoinTask 的方法
    • fork():将任务放入队列并安排异步执行。
    • join():等待任务完成并返回结果。
    • invoke():结合 fork()join(),启动任务,等待其结束并返回结果。
  6. 提交任务到 ForkJoinPool
    • 可以通过 ForkJoinPoolinvoke()execute()submit() 方法提交 ForkJoinTask 任务。invokeAll() 方法可以同时提交多个任务,并返回一个 Future 列表。

3、ForkJoinTask 业务组件设计

c51cb9b290a8d0568c26211d031e1a01_ff0dac8c606748be8d4dc7075cba1f8d.png
  1. 核心组件
    • ForkJoinPool:整个框架的中心节点。
    • 工作窃取算法:包括空闲线程窃取任务和平衡工作负载。
    • 双端队列:每个线程维护的任务队列。
    • 任务分解(Fork/Join):任务被分解成子任务,并行执行后合并结果。
  2. 任务管理
    • 任务类型:包括 RecursiveActionRecursiveTask
    • 线程管理:动态管理线程数量。
    • 局部性优化:任务通常由提交它们的线程执行。
  3. 异常与控制
    • 异常处理:捕获并处理任务执行过程中的异常。
    • 任务提交:包括 executesubmitinvoke 方法。
    • 任务同步:包括 getjoininvoke 方法来同步任务执行和获取结果。
    • 取消和超时:支持任务取消和超时控制。
  4. 监控与配置
    • 管理界面:监控和控制线程池行为。
    • 公平性和优先级:默认不支持,但可以通过自定义实现。
    • 任务的不可中断性:任务默认是不可中断的。

4、ForkJoinTask 工作窃取算法流程

e902b76a776f0a92fea60a6da4aad465_1f40d5648b264b50b16ba41ad78d1d16.png

步骤:

  1. 开始:并行计算的起点。
  2. 线程X - 任务队列X:每个线程都有自己的任务队列。
  3. 线程X执行任务:线程从自己的任务队列中取出任务并执行。
  4. 线程X空闲? :检查线程是否空闲(即任务队列为空)。
  5. 窃取任务:如果线程空闲,它会尝试从其他线程的任务队列中窃取任务。

5、ForkJoinTask 常用方法

5.1. 任务执行和控制

5.2. 结果处理

5.3. 异常和取消

5.4. 任务组合和依赖

5.5. 辅助方法

5.6. 任务重用

6、ForkJoinTask 应用案例

6.1 入门案例

业务数据,表示一年中每一天的销售额(单位:元):

int[] dailySales = {
    10000, 15000, 12000, 18000, 16000, 20000, 21000, 19000, 17000, 15000,
    14000, 13000, 11000, 12000, 13000, 14000, 15000, 16000, 17000, 18000,
    19000, 20000, 21000, 22000, 23000, 24000, 25000, 26000, 27000, 28000,
    29000, 30000, 31000, 32000, 33000, 34000, 35000, 36000, 37000, 38000,
    39000, 40000, 41000, 42000, 43000, 44000, 45000, 46000, 47000, 48000,
    49000, 50000
};

代码实现

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

// 继承 RecursiveTask 并实现 compute 方法,用于计算数组的和
class SumTask extends RecursiveTask<Long> {
    private static final long serialVersionUID = 1L;
    private final int[] array; // 要计算的数组
    private final int start;   // 数组的开始索引
    private final int end;     // 数组的结束索引

    // 构造函数,初始化数组和索引范围
    public SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    // 实现 compute 方法,该方法将被并行执行
    protected Long compute() {
        long sum = 0; // 用于累积求和的结果
        int length = end - start; // 计算当前任务处理的数组长度

        // 如果任务足够小,直接计算
        if (length <= 10000) {
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
        } else {
            // 如果任务较大,分成两个子任务
            int middle = (start + end) / 2;
            // 创建子任务1,处理数组的前半部分
            SumTask subTask1 = new SumTask(array, start, middle);
            // 创建子任务2,处理数组的后半部分
            SumTask subTask2 = new SumTask(array, middle, end);

            // 执行子任务,并行计算
            subTask1.fork();
            subTask2.fork();

            // 等待子任务完成并合并结果
            sum = subTask1.join() + subTask2.join();
        }
        return sum;
    }
}

public class ParallelSumCalculation {
    public static void main(String[] args) {
        // 拟一年的每天销售额数据
        int[] dailySales = {
            // ... (一年的每天销售额数据)
        };

        // 创建 ForkJoinPool 线程池
        ForkJoinPool pool = new ForkJoinPool();
        // 创建 SumTask 任务,处理整个销售额数组
        SumTask task = new SumTask(dailySales, 0, dailySales.length);
        // 执行任务并获取结果
        long totalSales = pool.invoke(task);

        System.out.println("一年的总销售额为: " + totalSales + "元");
    }
}

6.2 订单总销售额计算

一个电子商务平台需要计算所有订单的总销售额。这个平台有大量的订单数据,分布在不同的数据库分片中。为了快速得到总销售额,我们可以利用ForkJoinTask来并行处理每个数据库分片的数据,然后将结果合并。

计算总销售额代码

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.Arrays;

// 订单类,包含订单的ID和销售额
class Order {
    long id;
    double sales;

    public Order(long id, double sales) {
        this.id = id;
        this.sales = sales;
    }
}

// 计算订单总销售额的任务
class CalculateSalesTask extends RecursiveTask<Double> {
    private static final int THRESHOLD = 100; // 分解任务的阈值
    private Order[] orders;
    private int start;
    private int end;

    public CalculateSalesTask(Order[] orders, int start, int end) {
        this.orders = orders;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Double compute() {
        double totalSales = 0.0;
        if (end - start < THRESHOLD) {
            // 任务足够小,直接计算
            for (int i = start; i < end; i++) {
                totalSales += orders[i].sales;
            }
        } else {
            // 任务较大,分成两个子任务
            int middle = (start + end) / 2;
            CalculateSalesTask subTask1 = new CalculateSalesTask(orders, start, middle);
            CalculateSalesTask subTask2 = new CalculateSalesTask(orders, middle, end);

            // 执行子任务
            subTask1.fork();
            subTask2.fork();

            // 等待子任务完成并合并结果
            totalSales = subTask1.join() + subTask2.join();
        }
        return totalSales;
    }
}

public class ECommerceSalesCalculation {
    public static void main(String[] args) {
        // 订单数据
        Order[] orders = new Order[1000];
        for (int i = 0; i < orders.length; i++) {
            orders[i] = new Order(i, Math.random() * 10000); // 随机生成销售额
        }

        // 创建 ForkJoinPool 线程池
        ForkJoinPool pool = new ForkJoinPool();
        // 创建任务,处理整个订单数组
        CalculateSalesTask task = new CalculateSalesTask(orders, 0, orders.length);
        // 执行任务并获取结果
        double totalSales = pool.invoke(task);

        System.out.println("总销售额为: " + totalSales);
    }
}
上一篇 下一篇

猜你喜欢

热点阅读