执行器

2019-10-16  本文已影响0人  原创迷恋者

在之前的例子中,线程执行的任务,在Runnable对象中定义,和线程,在Thread对象中定义,两者之间总是有一种密切的关联。这样的机制对小型应用管用,但是在大型的应用中,把线程管理和应用的其他部分分离会更好。负责封装这些功能的对象被成为Executors执行器。之后的子章节详细介绍了执行器的细节。

Executor Interfaces

java.util.concurrent包定义了三种执行器接口:

执行器接口

执行器接口提供了一种简单的方法,execute,被设计成线程创造动作的一个简单代替。如果r是一个Runnable对象,那么e是一个可以替换的Executor对象,(new Thread(r)).start(); 可以替换成e.execute(r);。然而,execute的定义没有那么具体。底层次的做法是建立一个线程,然后立刻执行。依赖于Executor的实现,execute也会做同样的事,但是更有可能会使用一个现有的工人线程来运行r,或者是把r放进一个队列,等待工人线程可用。(我们将在线程池的章节讲到工人线程。)

java.util.concurrent包中的执行器实现是被设计成能充分利用更高级的ExecutorService和ScheduledExecutorService接口,尽管他们也是基于基本的Executor接口工作。

ExecutorService接口

ExecutorService接口使用一个更加全能的方法submit来支持execute方法。和execute方法一样,submit方法接受Runnable对象,但也接受Callable对象,Callable对象允许任务拥有一个返回值。submit方法会返回一个Future对象,它可以获取Callable的返回值,以及管理Callable和Runnable任务的状态。

ExecutorService也为提交大规模Callable集合对象提供了方法。最后,ExecutorService提供了一系列的方法来管理执行器的关闭。为了支持立刻关闭,任务需要正确处理中断。

ScheduledExecutorService接口

与它的父类ExecutorService相比,ScheduledExecutorService接口提供了schedule方法,该方法可以在一段时间的延迟后执行Runnable或Callable任务。此外,该接口定义了scheduleAtFixedRate方法和scheduleWithFixedDelay方法,他们可以以特定的间隔,反复执行特定的任务。

线程池

java.util.concurrent包中大部分的执行器实现使用了线程池。这些线程池和Runnable、Callable任务分别存在,并且总是被用来运行多任务。

使用工人线程最小程度地减少了由于线程创建带来的开销。新建线程会使用大量的内存,在大规模的应用中,分配和回收很多线程对象会带来巨大的内存管理开销。

一个常见的线程池类型是固定线程池。这种类型的线程池拥有指定数量的正在执行的线程;如果一个线程在运行时意外退出了,另一个新的线程会自动取代它。任务通过内部的队列分配给池子,队列负责保存多余的任务,当任务的数量超过线程数量时。

固定线程池的一个显著的优点是应用会非常小心地使用它。想要理解这一点,考虑一个网页服务器应用,其中的每个HTTP应用都被一个不同的线程处理。如果这个应用在每次获得新线程的时候都创建一个新的线程,那么假如这个系统忽然收到超出它能力的请求数量,它可能会忽然停止响应所有请求,因为这些操作的额外成本超出了系统的能力。

通过限制系统中能创建的线程数量,应用没办法像请求发送速度一样地处理请求,但它能够尽它所能地来服务请求。

创建固定线程池的一个简单方法是使用在java.util.concurrent包中的newFixedThreadPool工厂方法。这个类也提供了其他种类的工厂方法如下所示:

如果上面提供的方法没有能满足你的要求,构造java.util.concurrent.ThreadPoolExecutor或java.util.concurrent.ScheduledThreadPoolExecutor的实例会给你更多的选择。

Fork/Join

Fork/Join框架是ExecutorService接口的一个实现,它可以帮助你利用多处理器。它被设计成针对那些能够被递归分解为更小任务的工作。目的是最大限度地使用可提供地处理器能力来提高应用的性能。

和其他ExecutorService的实现一样,fork/join框架把任务分发给线程池中的若干工人线程。让Fork/Join与众不同的是,它使用了工作偷窃算法。做完了事情的工人线程会从其他繁忙线程中偷任务。

Fork/Join框架的核心类是ForkJoinPool,它是AbstractExecutorService类的扩展。ForkJoinPool实现了核心的工作偷窃算法,并且会执行ForkJoinTask进程。

基础用法

使用fork/join框架的第一步,是写能执行一部分工作的代码。你的代码会看上去和下面的很像:

if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces
  invoke the two pieces and wait for the results

把这些代码包裹在ForkJoinTask子类中,或是使用它的更为具体的一个子类,比如RecursiveTask或RecursiveAction。

等你的ForkJoinTask子类完成之后,创建一个代表所有工作的对象,然后把它传递给ForkJoinPool实例的invoke方法。

为了清晰而模糊

为了帮助你理解fork/join框架的工作原理,考虑下面的案例。假设你想要涂掉一幅画。原始的画素材由一个整数数组代表,其中每个整数都包含着一个像素的颜色值。混淆后的目标图也是由相同大小的整数数组代表。

实施模糊操作是由每次操作一个像素完成的。每个像素都被赋值为它周围像素的平均值,结果存放在结果数组中。因为这个图像是一个很大的数组,这个过程会花费很长的时间。你可以通过实现算法来使用fork/join,利用多核系统的并行处理能力。下面是一种可能的实现:

public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
  
    // Processing window size; should be odd.
    private int mBlurWidth = 15;
  
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >>  8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >>  0)
                      / mBlurWidth;
            }
          
            // Reassemble destination pixel.
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }

你现在实现了抽象的compute()方法,它要么直接进行模糊操作,要么把任务分成两份。一个简单的数组长度阈值决定了这个工作是直接运算,还是分割。

protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }
    
    int split = mLength / 2;
    
    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}
上一篇下一篇

猜你喜欢

热点阅读