ForkJoin框架

2018-12-11  本文已影响0人  单纯小码农

在这篇文章中,将覆盖如下内容:

1.什么是Fork/Join 框架

Fork/Join 框架是java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

Fork Join 的运行流程图如下:

1.jpg

2.工作窃取

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

那么为什么需要使用工作窃取算法呢?

假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。

但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

3.Fork/Join框架的设计

如何设计Fork/Join框架?
步骤1 分割任务
步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join 使用两个类来完成以上两件事情。
ForkJoinTask: 我们需要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。

ForkJoinPool: ForkJoinTask需要通过ForkJoinPool执行。

任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

4.RecursiveAction抽象类——没有返回值的任务

源码浅读

  public abstract class RecursiveAction extends ForkJoinTask<Void> {
    private static final long serialVersionUID = 5232453952276485070L;

  /**
   * 由这个任务执行的主要计算
   */
  protected abstract void compute();

  /**
   * 总是返回null
   */
  public final Void getRawResult() { return null; }

  /**
   * 搞不懂为什么要这么写?既然参数不用,为什么还要传入
   */
  protected final void setRawResult(Void mustBeNull) { }

  /**
   * 执行计算
   */
  protected final boolean exec() {
      compute();
      return true;
   }
}

继承自ForkJoinTask类。

compute()是抽象方法。

5. RecursiveAction实战--同步方式

目标:实现一个任务来修改产品列表的价格

步骤如下:

1.创建类Product,将用来存储产品的名称和价格。

public class Product {
  private String name;
  private double price;

  public String getName() {
      return name;
  }

  public void setName(String name) {
      this.name = name;
  }

  public double getPrice() {
      return price;
  }

  public void setPrice(double price) {
      this.price = price;
  }
}  

2.创建ProductListGenerator类,用来产生随机产品的数列

public class ProductListGenerator {
    public List<Product> generate(int size){
        List<Product> ret=new ArrayList<>();
        for(int i=0;i<size;i++){
            Product product = new Product();
            product.setName("Product"+i);
            product.setPrice(10);
            ret.add(product);
        }
        return ret;
    }
}

3.创建Task类,指定它继承RecursiveAction类。

public class Task extends RecursiveAction {
  private static final long serialVersionUID = 1L;

  private List<Product> products;

  private int first;
  private int last;

  private double increment;

  public Task(List<Product> products, int first, int last, double increment) {
      this.products = products;
      this.first = first;
      this.last = last;
      this.increment = increment;
  }

  @Override
  protected void compute() {
      if (last - first < 10) {
          updatePrices();
      } else {
          int middle = (last + first) / 2;
          System.out.printf("Task: Pending tasks: %s\n", getQueuedTaskCount());
          Task t1 = new Task(products, first, middle + 1, increment);
          Task t2 = new Task(products, middle + 1, last, increment);
        //它调用invokeAll()方法,执行每个任务所创建的子任务。这是一个同步调用,这个任务在继续(可能完成)它的执行之前,必须等待子任务的结束。
         // 当任务正在等待它的子任务(结束)时,正在执行它的工作线程执行其他正在等待的任务。
        // 在这种行为下,Fork/Join框架比Runnable和Callable对象本身提供一种更高效的任务管理。
          invokeAll(t1, t2);
      }
  }

  private void updatePrices() {
      for (int i = first; i < last; i++) {
          Product product = products.get(i);
          product.setPrice(product.getPrice() * (1 + increment));
      }
  }
}

4.通过创建Main类,并实现main()方法。

public class TaskMain {
  public static void main(String[] args) {
    ProductListGenerator generator = new ProductListGenerator();
    List<Product> products = generator.generate(100000);
    Task task = new Task(products, 0, products.size(), 0.2);
    System.out.println("task start");
//创建pool使用默认的无参构造函数,线程池使用的线程数由Runtime.getRuntime().availableProcessors()返回值决定
    ForkJoinPool pool = new ForkJoinPool();
    //在池中使用execute()方法执行这个任务。一个异步调用,而主线程继续它的执行。
    pool.execute(task);
    //显示每隔5毫秒池中的变化信息
    do {
        System.out.printf("Main: Thread Count: %d\n", pool.getActiveThreadCount());
        System.out.printf("Main: Thread Steal: %d\n", pool.getStealCount());
        System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
        System.out.println("------------------------");
        try {
            TimeUnit.MILLISECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    } while (!task.isDone());

    pool.shutdown();
    //使用isCompletedNormally()方法检查假设任务完成时没有出错,在这种情况下,写入一条信息到控制台。
    if (task.isCompletedNormally()) {
        System.out.printf("Main: The process has completed normally.\n");
    }

    for (int i = 0; i < products.size(); i++) {
        Product product = products.get(i);
        if (product.getPrice() != 12) {
            System.out.printf("Product %s: %f\n", product.getName(), product.getPrice());
          }
      }
      System.out.println("Main: End of the program.\n");
  }
}

6. RecursiveTask抽象类——有返回值的任务

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;

    /**
     * 计算的结果
     */
    V result;

    /**
     * 由这个任务执行的主要计算
     * @return 返回计算的结果
     */
    protected abstract V compute();

    public final V getRawResult() {
        return result;
    }
  
   protected final void setRawResult(V value) {
        result = value;
    }

    /**
     * 执行计算
     */
    protected final boolean exec() {
        result = compute();
        return true;
    }
}

继承自ForkJoinTask类。

compute()是抽象方法。

7.RecursiveTask 实战--同步方式

目标:开发一个在文档中查找单词的应用程序

步骤如下:

1.创建一个DocumentMock类,它将产生用来模拟文档的字符串的二维数组。

public class DocumentMock {
    private String words[] = {"the", "hello", "goodbye", "packt", "java", "thread", "pool", "random", "class", "main"};

    public String[][] generateDocument(int numLines, int numWords, String word) {
    int counter = 0;
    String document[][] = new String[numLines][numWords];
    Random random = new Random();
    for (int i = 0; i < numLines; i++) {
        for (int j = 0; j < numWords; j++) {
            int index = random.nextInt(words.length);
            document[i][j] = words[index];
            if (words[index].equals(word)) {
                counter++;
            }
        }
    }
    System.out.println("DocumentMock: The word appers " + counter + " times in the document");
    return document;
}

}

2.创建一个DocumentTask类,指定它继承RecursiveTask类,并参数化为Integer类型。该类将实现统计单词在一组行中出现的次数的任务。

public class DocumentTask extends RecursiveTask<Integer> {
    private String document[][];
    private int start, end;
    private String word;

    public DocumentTask(String document[][], int start, int end, String word) {
    this.document = document;
    this.start = start;
    this.end = end;
    this.word = word;
  }

  @Override
  protected Integer compute() {
    int result = 0;
    if (end - start < 10) {
        result = processLines(document, start, end, word);
    } else {
        int mid = (start + end) / 2;
        DocumentTask task1 = new DocumentTask(document, start, mid, word);
        DocumentTask task2 = new DocumentTask(document, mid, end, word);
        invokeAll(task1, task2);
        try {
            result = task1.get() + task2.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
      }
      return result;
  }

  private int processLines(String[][] document, int start, int end, String word) {
    List<LineTask> tasks = new ArrayList<LineTask>();
    for (int i = start; i < end; i++) {
        LineTask task = new LineTask(document[i], 0, document[i].length, word);
        tasks.add(task);
    }
    invokeAll(tasks);
    int result = 0;
    for (int i = 0; i < tasks.size(); i++) {
        LineTask task = tasks.get(i);
        try {
            result = result + task.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
          }
      }
      return result;
  }
}

3..创建LineTask类,指定它继承RecursiveTask类,并参数化为Integer类型。这个类将实现统计单词在一行中出现的次数的任务。

public class LineTask extends RecursiveTask<Integer> {
  private static final long serialVersionUID = 1L;

  private String line[];
  private int start;
  private int end;
  private String word;

  public LineTask(String line[], int start, int end, String word) {
    this.line = line;
    this.start = start;
    this.end = end;
    this.word = word;
  }

  @Override
  protected Integer compute() {
    int result = 0;
    if (end - start < 100) {
        result = count(line, start, end, word);
    } else {
        int mid = (start + end) / 2;
        LineTask task1 = new LineTask(line, start, mid, word);
        LineTask task2 = new LineTask(line, mid, end, word);
        invokeAll(task1, task2);
        try {
            result = task1.get() + task2.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
      }
      return result;
  }

  private int count(String[] line, int start, int end, String word) {
    int counter = 0;
    for (int i = start; i < end; i++) {
        if (line[i].equals(word)) {
            counter++;
        }
    }
    //为了显示demo的执行,令任务睡眠10毫秒。
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return counter;
  }
}

4.实现示例的主类,通过创建Main类,并实现main()方法。

public class TaskMain {
   public static void main(String[] args) {
      DocumentMock documentMock = new DocumentMock();
      int numLines = 100;
      int lineWordsNum = 1000;
      String word = "the";
      String[][] document = documentMock.generateDocument(numLines, lineWordsNum, word);
      DocumentTask task = new DocumentTask(document, 0, numLines, word);

      ForkJoinPool forkJoinPool = new ForkJoinPool();
      forkJoinPool.execute(task);
      do {
          System.out.printf("******************************************\n");
          System.out.printf("Main: Parallelism: %d\n", forkJoinPool.getParallelism());
          System.out.printf("Main: Active Threads: %d\n", forkJoinPool.getActiveThreadCount());
          System.out.printf("Main: Task Count: %d\n", forkJoinPool.getQueuedTaskCount());
          System.out.printf("Main: Steal Count: %d\n", forkJoinPool.getStealCount());
          System.out.printf("******************************************\n");
          try {
            TimeUnit.SECONDS.sleep(1);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      } while (!task.isDone());

      forkJoinPool.shutdown();

      try {
          System.out.printf("Main: The word appears %d in the document", task.get());
      } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
      }
   }
  }

8.RecursiveTask实战--异步方式

目标:使用ForkJoinPool和ForkJoinTask类提供的异步方法来管理任务。实现一个程序,在一个文件夹及其子文件夹内查找确定扩展名的文件。

public class FolderProcessor extends RecursiveTask<List<String>> {
  private static final long serialVersionUID = 1L;
  private String path;

  private String extension;

  public FolderProcessor(String path, String extension) {
      this.path = path;
      this.extension = extension;
  }

  @Override
  protected List<String> compute() {
      //用来保存存储在文件夹中的文件。
      List<String> list = new ArrayList<>();
      //声明一个FolderProcessor任务的数列,用来保存将要处理存储在文件夹内的子文件夹的子任务
      List<FolderProcessor> tasks = new ArrayList<>();
      File file = new File(path);
      File content[] = file.listFiles();
      if (content != null) {
          //对于文件夹里的每个元素,如果是子文件夹,则创建一个新的FolderProcessor对象,并使用fork()方法异步地执行它。
          for (int i = 0; i < content.length; i++) {
              if (content[i].isDirectory()) {
                  FolderProcessor task = new FolderProcessor(content[i].getAbsolutePath(), extension);
                  task.fork();
                  tasks.add(task);
              } else {
                  //否则,使用checkFile()方法比较这个文件的扩展名和你想要查找的扩展名
                  // 如果它们相等,在前面声明的字符串数列中存储这个文件的全路径。
                  if (checkFile(content[i].getName())) {
                      list.add(content[i].getAbsolutePath());
                  }
              }
          }
      }

      if (tasks.size() > 50) {
          System.out.printf("%s: %d tasks ran.\n", file.getAbsolutePath(), tasks.size());
      }
      addResultsFromTasks(list, tasks);
      return list;
  }

  private void addResultsFromTasks(List<String> list, List<FolderProcessor> tasks) {
      //对于保存在tasks数列中的每个任务,调用join()方法,这将等待任务执行的完成,并且返回任务的结果
      for (FolderProcessor item : tasks) {
          list.addAll(item.join());
      }
  }

  private boolean checkFile(String name) {
      return name.endsWith(extension);
  }

  public static void main(String[] args) {
      ForkJoinPool forkJoinPool = new ForkJoinPool();
      FolderProcessor folderProcessor1 = new FolderProcessor("/Users/wuzhenyu/IdeaProjects/ares2/ares-web/src/main/web/h5/src", "css");
      FolderProcessor folderProcessor2 = new FolderProcessor("/Users/wuzhenyu/IdeaProjects/ares2/ares-web/src/main/web/h5/node_modules/_acorn-dynamic-import@2.0.2@acorn-dynamic-import", "css");

      forkJoinPool.execute(folderProcessor1);
      forkJoinPool.execute(folderProcessor2);

      do {
          System.out.printf("******************************************\n");
          System.out.printf("Main: Parallelism: %d\n", forkJoinPool.getParallelism());
          System.out.printf("Main: Active Threads: %d\n", forkJoinPool.getActiveThreadCount());
          System.out.printf("Main: Task Count: %d\n", forkJoinPool.getQueuedTaskCount());
          System.out.printf("Main: Steal Count: %d\n", forkJoinPool.getStealCount());
          System.out.printf("***************************************** *\n");
          try {
              TimeUnit.SECONDS.sleep(1);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      } while ((!folderProcessor1.isDone()) || (!folderProcessor2.isDone()));

      forkJoinPool.shutdown();

      List<String> results;
      results = folderProcessor1.join();
      System.out.printf("Documents: %d files found.\n", results.size());
      results = folderProcessor2.join();
      System.out.printf("Documents: %d files found.\n", results.size());
  }
}

9.Fork/Join框架的异常处理

在Java中有两种异常:

  1. 已检查异常(Checked exceptions):这些异常必须在一个方法的throws从句中指定或在内部捕捉它们。比如:IOException或ClassNotFoundException。
  2. 未检查异常(Unchecked exceptions):这些异常不必指定或捕捉。比如:NumberFormatException。

在ForkJoinTask类的compute()方法中,你不能抛出任何已检查异常,因为在这个方法的实现中,它没有包含任何抛出(异常)声明。你必须包含必要的代码来处理异常。但是,你可以抛出一个未检查异常。

ForkJoinTask和ForkJoinPool类的行为与你可能的期望不同。程序不会结束执行,并且你将不会在控制台看到任何关于异常的信息。它只是被吞没,好像它没抛出(异常)。

实战目标:判断一个任务是否抛出异常

  public class ExceptionTask extends RecursiveTask<Integer> {
    private int array[];
    private int start;
    private int end;

    public ExceptionTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        System.out.printf("Task: Start from %d to %d\n", start, end);
        if (end - start < 10) {
            if (start < 3 && end > 3) {
                throw new RuntimeException("This task throws an Exception:Task from " + start + " to " + end);
            }
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            int mid = (start + end) / 2;
            ExceptionTask exceptionTask1 = new ExceptionTask(array, start, mid);
            ExceptionTask exceptionTask2 = new ExceptionTask(array, mid, end);
            invokeAll(exceptionTask1, exceptionTask2);
      }
        System.out.printf("Task: End form %d to %d\n", start, end);
      return 0;
    }

  public static void main(String[] args) {
      int array[] = new int[100];
      ExceptionTask task = new ExceptionTask(array, 0, 100);
      ForkJoinPool pool = new ForkJoinPool();
      pool.execute(task);
      pool.shutdown();

      try {
          pool.awaitTermination(1, TimeUnit.DAYS);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }

      //使用isCompletedAbnormally()方法,检查这个任务或它的子任务是否已经抛出异常。
      if (task.isCompletedAbnormally()) {
          System.out.printf("Main: An exception has ocurred\n");
          System.out.printf("Main: %s\n", task.getException());
      }
      System.out.printf("Main: Result: %d", task.join());
    }
}

10.ForkJoinTask 抽象类

public abstract class ForkJoinTask<V> implements Future<V>, Serializable

ForkJoinTask是运行于ForkJoinPool的任务的抽象基类。在牺牲一些使用上的限制的情况下,大量任务和子任务可以被 ForkJoinPool 的少量实际线程托管。

主任务被明确提交到ForkJoinPool时开始执行。一旦开始,它通常会开始启动其他子任务。正如该类的名称所示,许多使用ForkJoinTask的程序只使用方法fork()和join()或派生方法(如invokeAll)。然而这个类还提供了许多其他方法,用来支持更高级的用法。

ForkJoinTask的任务有四种状态:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL

常用方法介绍

方法1: fork()

public final ForkJoinTask<V> fork()

安排异步执行此任务

returns:
this

源码浅读:

public final ForkJoinTask<V> fork() {
      Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

如果当前线程是ForkJoinWorkerThread的实例,就把任务加入workQueue中。否则把任务提交到commonPool中。

方法2: get()

public final V get() throws InterruptedException,ExecutionException
等待必要的计算完成,然后获取其结果。

该方法会抛出三类异常:

  1. 当计算被取消时,抛出CancellationException
  2. 如果计算抛出异常,抛出ExecutionException
  3. 如果当前线程不是ForkJoinPool的成员,并且在等待时被中断,
    则抛出InterruptedException

returns:
返回计算结果

源码浅读:

 public final V get() throws InterruptedException, ExecutionException {
    int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
        doJoin() : externalInterruptibleAwaitDone();
    Throwable ex;
    if ((s &= DONE_MASK) == CANCELLED)
        throw new CancellationException();
    if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
        throw new ExecutionException(ex);
    return getRawResult();
}

doJoin()和externalInterruptibleAwaitDone()方法都会返回当前任务的状态, 且都会造成阻塞, 直到完成或中断。其中externalInterruptibleAwaitDone()方法会在线程中断时,抛出InterruptedException
根据当前线程是否是ForkJoinWorkerThread的实例,来决定调用哪个方法。
如果任务被取消(CANCELLED),则抛出CancellationException
如果任务出现异常,则抛出对应的异常。
否则返回结果。

方法3:join()

public final V join()
该方法会阻塞当前线程并等待获取结果。

returns:
返回计算结果

源码浅读:

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

如果doJoin()方法返回的状态不是 NORMAL(已完成),就返回抛出与给定状态关联的异常(如果有的话)。
否则 返回结果。

方法4:invoke()

public final V invoke()

开始执行此任务,如果有必要则等待它完成,返回结果或抛出一个(非检查) RuntimeException 或 Error(如果底层运算出现这样的操作)

returns:
返回计算结果

源码浅读:

public final V invoke() {
    int s;
    if ((s = doInvoke() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

如果doInvoke()方法返回的状态不是 NORMAL(已完成),就返回抛出与给定状态关联的异常(如果有的话)。
否则 返回结果。

我们可以看到,invoke()方法内部调用了doInvoke(),join方法内部调用了doJoin()。那么这两者有什么区别?请看源码:

doJoin源码:

private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
}

doJoin方法首先会判断任务是否完成。如果完成了,就返回status。如果还没有完成,则会判断当前线程是不是ForkJoinWorkerThread的实例。

如果当前线程是ForkJoinWorkerThread的实例,就进行下面的判断。
(w = (wt = (ForkJoinWorkerThread)t).workQueue). ​ tryUnpush(this) && (s = doExec()) < 0? s : ​ wt.pool.awaitJoin(w, this, 0L) :
意思是该任务位于workQueue的顶部(tryUnpush会返回布尔值,判断任务是否处于Top),就调用doExec()方法执行任务,如果任务已经完成,就返回status。否则就调用wt.pool.awaitJoin(w, this, 0L)等待任务完成。

如果当前线程不是ForkJoinWorkerThread的实例,就调用externalAwaitDone(),等待完成。

doInvoke源码:

 private int doInvoke() {
    int s; Thread t; ForkJoinWorkerThread wt;
    return (s = doExec()) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (wt = (ForkJoinWorkerThread)t).pool.
        awaitJoin(wt.workQueue, this, 0L) :
        externalAwaitDone();
}

从源码中可知,doInvoke会立即调用doExec()方法来执行任务。如果任务完成了,就返回status。

否则根据当前线程是否是ForkJoinWorkerThread的实例,来决定调用哪个方法等待任务完成。

结论: invoke()方法会立即执行任务,而join()方法需要当前任务处于workQueue顶部才会执行任务(没有完成的情况下)。

方法5: invokeAll()

public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2)

public static void invokeAll(ForkJoinTask<?>... tasks)

public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks)

执行所给定的任务。当所有任务都结束了或碰到非受查异常(此时异常将会被重新抛出)就会返回。如果不止一个任务遇到异常,那么这个方法将会抛出其中一个异常。可以使用getException()方法得到每个任务的状态,或者使用相关方法检查任务是否是被取消,或异常完成,或正常完成,或未处理。

如果任意任务为null,抛出NullPointerException

源码浅读:

 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
    int s1, s2;
    t2.fork();
    if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
        t1.reportException(s1);
    if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
        t2.reportException(s2);
}

t2调用fork()异步执行任务。t1顺序执行任务。如果有任务非正常完成,就会抛出与任务状态相对应的异常。

方法6:cancel()

public boolean cancel(boolean mayInterruptIfRunning)

尝试取消此任务。当此方法返回成功后,除非调用reinitialize()进行干预,否则随后调用isCancelled()isDone()cancel方法都将返回true,调用join()和其相关方法将导致CancellationException结果。

此方法旨在由其他任务调用。要终止当前任务,只需从其计算方法中返回或抛出未经检查的异常,或者调用completeExceptionally(Throwable)

returns:
如果此任务当前已取消,返回true

源码浅读:

 public boolean cancel(boolean mayInterruptIfRunning) {
    return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}

mayInterruptIfRunning这个参数对这个方法的结果没有影响。
通过setCompletion方法设置任务的状态。

方法7: isDone()

public final boolean isDone()

returns:
如果任务已经完成,返回true

源码浅读:

public final boolean isDone() {
    return status < 0;
}

结束可能是由于正常终止,异常或取消,在所有这些情况下,此方法将返回true。

方法8: isCancelled()

public final boolean isCancelled()

returns:
如果该任务在正常完成之前被取消,则返回true

源码浅读:

public final boolean isCancelled() {
    return (status & DONE_MASK) == CANCELLED;
}

任务状态为CANCELLED就返回true

方法9: isCompletedAbnormally()

public final boolean isCompletedAbnormally()

returns:
如果此任务抛出异常或被取消,返回true。

public final boolean isCompletedAbnormally() {
    return status < NORMAL;
}

方法10:isCompletedNormally()

public final boolean isCompletedNormally()

returns:
如果此任务已经完成且没有抛异常和被取消,返回true。

  public final boolean isCompletedNormally() {
    return (status & DONE_MASK) == NORMAL;
}

方法11:getException()

public final Throwable getException()

returns:
返回异常

源码浅读:

 public final Throwable getException() {
    int s = status & DONE_MASK;
    return ((s >= NORMAL)    ? null :
            (s == CANCELLED) ? new CancellationException() :
            getThrowableException());
}

如果任务没有完成或没有抛出异常,则返回null。
如果任务被取消了则返回CancellationException。
如果任务出现了异常,就返回该异常。

11.ForkJoinPool

Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。

ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器。

  1. ForkJoinPool(int parallelism): 创建一个包含parallelism个并行线程的ForkJoinPool。
  2. ForkJoinPool():以Runtime.availableProcessors()方法的返回值作为parallellism参数来构建ForkJoinPool。

Java8为ForkJoinPool增加了通用池功能。ForkJoinPool类通过如下两个静态方法提供通用池功能。

  1. ForkJoinPool commonPool(): 该方法返回一个通用池,通用池的运行状态不会受shutdown()或shutdownNow()方法的影响。当然,如果程序直接执行System.exit(0);来终止虚拟机,通用池以及通用池中正在执行的任务都会被自动终止。
  2. int getCommonPoolParallelism():该方法返回通用池的并行级别。

常用方法介绍

方法1. execute()

public void execute(ForkJoinTask<?> task)

异步执行任务

源码如下:

  public void execute(ForkJoinTask<?> task) {
      if (task == null)
          throw new NullPointerException();
      externalPush(task);
  }

另一个版本 public void execute(Runnable task)

执行Runnable对象。

源码如下:

public void execute(Runnable task) {
      if (task == null)
          throw new NullPointerException();
      ForkJoinTask<?> job;
      if (task instanceof ForkJoinTask<?>) // avoid re-wrap
          job = (ForkJoinTask<?>) task;
      else
          job = new ForkJoinTask.RunnableExecuteAction(task);
      externalPush(job);
}

RunnableExecuteAction是ForkJoinTask的静态内部类,继承ForkJoinTask类。

这里使用ForkJoinTask.RunnableExecuteAction方法,将Runnable包装成了ForkJoinTask。

方法2.invoke()

public <T> T invoke(ForkJoinTask<T> task)

同步执行任务。直到任务完成返回结果。如果执行过程中遇到未受查异常或Error, 将重新抛出,作为这次调用的结果。

源码如下:

public <T> T invoke(ForkJoinTask<T> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task.join();
}

invoke与execute的区别是多了task.join()。task.join()会阻塞线程,还会抛出异常。

方法3: awaitTermination()

public boolean awaitTermination(long timeout, TimeUnit unit) ​ throws InterruptedException

等待任务结束。

部分源码:

if (this == common) {
        awaitQuiescence(timeout, unit);
        return false;
}

这段代码的意思是,如果this是通用池,就执行awaitQuiescence()方法,并返回false。因此,ForkJoinPool.commonPool().awaitTermination 等同于
ForkJoinPool.commonPool(). awaitQuiescence。

这个方法会在线程池shutdown后任务完成,或者出现超时,或者当前线程中断时返回。

方法4: commonPool()

public static ForkJoinPool commonPool()

返回通用池池实例。这个池是静态构建的;其运行状态不受尝试shutdown()或shutdownNow()的影响。使用commonPool通常可以帮助应用程序中多种需要进行归并计算的任务共享计算资源,从而使后者发挥最大作用(ForkJoinPools中的工作线程在闲置时会被缓慢回收,并在随后需要使用时被恢复)

方法5: submit()

returns:

返回ForkJoinTask对象

版本1 :public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
      if (task == null)
          throw new NullPointerException();
      externalPush(task);
      return task;
  }

提交一个ForkJoinTask 用于执行。

该版本与public void execute(ForkJoinTask<?> task)区别是:多了一个ForkJoinTask类型的返回值。

版本2: public <T> ForkJoinTask<T> submit(Callable<T> task)

public <T> ForkJoinTask<T> submit(Callable<T> task) {
    ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
    externalPush(job);
    return job;
}

提交一个有返回值的任务用于执行,并返回 Future。该 Future 的 get 方法在任务成功完成时将会返回该任务的结果

AdaptedCallable作为ForkJoinTask的静态内部类,目的是适配Callable类型的task,使之能够在ForkJoinPool中运行。

AdaptedCallable继承了ForkJoinTask类和RunnableFuture接口(实现run方法)。

版本3:public <T> ForkJoinTask<T> submit(Runnable task, T result)

   public <T> ForkJoinTask<T> submit(Runnable task, T result) {
    ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
    externalPush(job);
    return job;
}

提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。

AdaptedRunnable作为ForkJoinTask的静态内部类,目的是适配Runnable类型的task,使之能够在ForkJoinPool中运行。

AdaptedRunnable继承了ForkJoinTask类和RunnableFuture接口(实现run方法)。

版本4: public ForkJoinTask<?> submit(Runnable task)

 public ForkJoinTask<?> submit(Runnable task) {
      if (task == null)
          throw new NullPointerException();
      ForkJoinTask<?> job;
     if (task instanceof ForkJoinTask<?>) // avoid re-wrap
          job = (ForkJoinTask<?>) task;
      else
          job = new ForkJoinTask.AdaptedRunnableAction(task);
      externalPush(job);
      return job;
}

这个版本task没有返回值。

提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。

AdaptedRunnableAction作为ForkJoinTask的静态内部类,目的是适配Runnable类型的task,使之能够在ForkJoinPool中运行。

AdaptedRunnableAction继承了ForkJoinTask类和RunnableFuture接口(实现run方法)。

方法6: invokeAll()

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

执行给定的任务列表,返回持有任务状态和结果的 Future 列表。 Future.isDone()对于返回列表的每个元素都是true。注意:已完成的任务,可能正常终止,也可能通过抛出异常终止。如果在进行此操作时修改了给定集合,则此方法的结果未定义。

方法7: getActiveThreadCount()

public int getActiveThreadCount()

返回当前正在窃取任务或执行任务的线程数的估计值,此方法可能高估活动线程的数量。

方法8: getStealCount()

public long getStealCount()

当前ForkJoinPool线程池内部各个work queue间发生的“工作窃取”操作的总次数,是个估计值。

方法9:getParallelism()

public int getParallelism()
返回这个线程池的并行级别。

方法10:getPoolSize()

public int getPoolSize()
返回已启动但尚未终止的工作线程数

方法11: isTerminated()

public boolean isTerminated()
如果所有任务在shutdown后都完成,则返回true。线程没有shutdown时,这个方法返回false。

方法12: isTerminating()

public boolean isTerminating()
如果此线程池正在终止任务但尚未全部终止,返回true。

12.最佳实践

  1. 在实际应用时,使用多个ForkJoinPool是没有什么意义的。正是出于这个原因,一般来说来它实例化一次,然后把实例保存在静态字段,使之成为单例,这样就可以在软件中任何部分方便地重用了。
  2. 对一个任务调用join方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。
  3. 不应该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。
  4. get方法可以得到当前结果,不过一般不太使用。因为它可能抛出已检查异常,而在compute方法中不允许抛出这些异常。
  5. 对子任务调用fork方法可以把它排进ForkJoinPool。同时对左边和右边的子任务调用它似乎很自然,但这样做的效率要比直接对其中一个调用compute低。这样做你可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。
  6. 调用使用分支/合并框架的并行计算可能有点棘手。特别是你平常都在你喜欢的IDE里面看栈跟踪(stack trace)来找问题,但放在分支-合并计算上就不行了,因为调用compute的线程并不是概念上的调用方,后者是调用fork的那个。
  7. 你不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计算快。一个任务可以分解成多个独立的子任务,才能让性能在并行化时有所提升。所有这些子任务的运行时间都应该比分出新任务所花的时间长;此外,在比较同一算法的顺序和并行版本的性能时还有别的因素要考虑。就像任何其他Java代码一样,分支/合并框架需要“预热”或者说要执行几遍才会被JIT编译器优化。这就是为什么在测量性能之前跑几遍程序很重要,我们的测试框架就是这么做的。同时还要知道,编译器内置的优化可能会为顺序版本带来一些优势(例如执行死码分析——删去从未被使用的计算)
上一篇 下一篇

猜你喜欢

热点阅读