程序员Java 并发基础原理

【Java 并发笔记】Fork/Join 框架相关整理(上)

2019-01-17  本文已影响4人  58bc06151329

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

1. 简介

2. 工作窃取(work-stealing)

work-stealing

工作窃取算法的优点

工作窃取算法的缺点

3. ForkJoinPool

ForkJoinPool 类图

3.1 Fork/Join 框架的使用

问题

解决方法

public class ExecutorServiceCalculator implements Calculator {
    private int parallism;
    private ExecutorService pool;

    public ExecutorServiceCalculator() {
        parallism = Runtime.getRuntime().availableProcessors(); // CPU的核心数
        pool = Executors.newFixedThreadPool(parallism);
    }

    private static class SumTask implements Callable<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        public Long call() throws Exception {
            long total = 0;
            for (int i = from; i <= to; i++) {
                total += numbers[i];
            }
            return total;
        }
    }

    @Override
    public long sumUp(long[] numbers) {
        List<Future<Long>> results = new ArrayList<>();

        // 把任务分解为 n 份,交给 n 个线程处理
        int part = numbers.length / parallism;
        for (int i = 0; i < parallism; i++) {
            int from = i * part;
            int to = (i == parallism - 1) ? numbers.length - 1 : (i + 1) * part - 1;
            results.add(pool.submit(new SumTask(numbers, from, to)));
        }

        // 把每个线程的结果相加,得到最终结果
        long total = 0L;
        for (Future<Long> f : results) {
            try {
                total += f.get();
            } catch (Exception ignore) {}
        }
        return total;
    }
}
public class ForkJoinCalculator implements Calculator {
    private ForkJoinPool pool;

    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Long compute() {
            // 当需要计算的数字小于6时,直接计算结果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            // 否则,把任务一分为二,递归计算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle+1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public ForkJoinCalculator() {
        // 也可以使用公用的 ForkJoinPool:
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length-1));
    }
}

3.2 Fork/Join 框架的原理

ForkJoinPool

ForkJoinPool

WorkQueue

ForkJoinWorkThread

work-stealing

ForkJoinTask


fork 方法

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

join 方法

join 方法的流程

3.2.1 ForkJoinPool

//  低位和高位掩码
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;
 
// 活跃线程数
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活跃线程数增量
private static final long AC_MASK = 0xffffL << AC_SHIFT; //活跃线程数掩码
 
// 工作线程数
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作线程数增量
private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩码
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);  // 创建工作线程标志
 
// 池状态
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;
 
// 实例字段
volatile long ctl;                   // 主控制参数
volatile int runState;               // 运行状态锁
final int config;                    // 并行度|模式
int indexSeed;                       // 用于生成工作线程索引
volatile WorkQueue[] workQueues;     // 主对象注册信息,workQueue
final ForkJoinWorkerThreadFactory factory;// 线程工厂
final UncaughtExceptionHandler ueh;  // 每个工作线程的异常信息
final String workerNamePrefix;       // 用于创建工作线程的名称
volatile AtomicLong stealCounter;    // 偷取任务总数,也可作为同步监视器
 
/** 静态初始化字段 */
//线程工厂
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
//启动或杀死线程的方法调用者的权限
private static final RuntimePermission modifyThreadPermission;
// 公共静态pool
static final ForkJoinPool common;
//并行度,对应内部common池
static final int commonParallelism;
//备用线程数,在tryCompensate中使用
private static int commonMaxSpares;
//创建workerNamePrefix(工作线程名称前缀)时的序号
private static int poolNumberSequence;
//线程阻塞等待新的任务的超时值(以纳秒为单位),默认2秒
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
//空闲超时时间,防止timer未命中
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;  // 20ms
//默认备用线程数
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
//阻塞前自旋的次数,用在在awaitRunStateLock和awaitWork中
private static final int SPINS  = 0;
//indexSeed的增量
private static final int SEED_INCREMENT = 0x9e3779b9;

ForkJoinPool 对象

// parallelism 定义并行级别
public static ExecutorService newWorkStealingPool(int parallelism);
// 默认并行级别为 JVM 可用的处理器个数
// Runtime.getRuntime().availableProcessors()
public static ExecutorService newWorkStealingPool();
// 类静态代码块中会调用makeCommonPool方法初始化一个commonPool
public static ForkJoinPool commonPool() {
        // assert common != null : "static init error";
        return common;
}
private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

参数说明

类型及其修饰符 变量名 作用
volatile long ctl 主控制参数,分为 4 个区域保存(每 16 位为 1 个区域)。
volatile int runState 保存线程池的 运行状态
final int config 保存线程池的 最大线程数量 及其 是否采用了公平模式
int indexSeed 用于在构造 WorkQueue 时计算插入到 workQueues 的下标。
volatile WorkQueue[] workQueues 线程池持有的工作线程(即执行任务的线程)。
final ForkJoinWorkerThreadFactory factory 该线程池指定的线程工厂,用于生产 ForkJoinWorkerThread 对象。
final String workerNamePrefix 该线程池中工作线程的名称前缀。
volatile AtomicLong stealCounter 该线程池中所有的 WorkQueue 总共被窃取的任务数量。

ctl 变量说明

区域 属性 说明
1 AC 正在运行工作线程数减去目标并行度,高 16 位。(49-64 位)
2 TC 总工作线程数减去目标并行度,中高 16 位。(33-48 位)
3 SS 栈顶等待线程的版本计数和状态,中低 16 位。(17-32 位)
4 ID 栈顶 WorkQueue 在池中的索引(poolIndex),低 16 位。(1-16 位)

线程池状态(runState )说明

// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
private static final int  RSLOCK     = 1;
private static final int  RSIGNAL    = 1 << 1;
private static final int  STARTED    = 1 << 2;
private static final int  STOP       = 1 << 29;
private static final int  TERMINATED = 1 << 30;
private static final int  SHUTDOWN   = 1 << 31;
private int lockRunState() {
    int rs;
    return ((((rs = runState) & RSLOCK) != 0 ||
             !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
            awaitRunStateLock() : rs);
}

config 变量

static final int SMASK        = 0xffff;        // short bits == max index
static final int LIFO_QUEUE   = 0;
static final int FIFO_QUEUE   = 1 << 16;
this.config = (parallelism & SMASK) | mode;

WorkQueue 对象

WorkQueue
类型及其修饰符 变量名 作用
volatile int scanState 保存这个 WorkQueue 的类型,线程是否繁忙(仅限 ACTIVE 类型)。
int stackPred 记录前驱 worker 的下标。
int nsteals 该 WorkQueue 被窃取的任务的总数。
int hint 用于窃取线程计算下次窃取的 workQueues 数组的下标。
int config 前 16 位(低 16 位)保存该 WorkQueue 在 workQueues 数组的下标,第 17 位(高 16 位)保存属于 LIFO 还是 FIFO 模式。
volatile int qlock 一个简单的锁,0 表示为加锁,1 表示已加锁,小于 0 表示当前 WorkQueue 已停止。
ForkJoinTask<?>[] array 任务队列,保存 ForkJoinTask 任务对象。
volatile int base bash 与 workQueues 数组长度取模的值窃取线程下次从 workQueues 数组取出任务的下标。
int top top 与 workQueues 数组长度取模的值即为下次将任务对象插入到 workQueues 数组的下标。
final ForkJoinPool pool 该 WorkQueue 对应的线程池。
final ForkJoinWorkerThread owner 该 WorkQueue 对应的工作线程对象(ACTIVE 类型的 WorkQueue 不会为 null)。
volatile Thread parker 当 currentThread 被 park(等待)时,用来保存这个线程对象来后续 unpark。
ForkJoinTask<?> currentJoin 调用 join 方法时等待结果的任务对象。
ForkJoinTask<?> currentSteal 保存正在执行的从别的 WorkQueue 窃取过来的任务。

WorkQueue 当前状态(scanState)

static final int SCANNING     = 1;             // false when running tasks
static final int INACTIVE     = 1 << 31;       // must be negative
static final int SS_SEQ = 1 << 16;

WorkQueue 锁标识(qlock)

config

static final int MODE_MASK    = 0xffff << 16;  // top half of int
int mode = config & MODE_MASK;
w.config = i | mode;

top 和 base

static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
base = top = INITIAL_QUEUE_CAPACITY >>> 1;

接下一篇 【Java 并发笔记】Fork/Join 框架相关整理(下)

上一篇 下一篇

猜你喜欢

热点阅读