执行器
在之前的例子中,线程执行的任务,在Runnable对象中定义,和线程,在Thread对象中定义,两者之间总是有一种密切的关联。这样的机制对小型应用管用,但是在大型的应用中,把线程管理和应用的其他部分分离会更好。负责封装这些功能的对象被成为Executors执行器。之后的子章节详细介绍了执行器的细节。
- Executor接口定义了三种执行器对象类型。
- Thread Pools是最常见的执行器实现。
- Fork/Join是JAVA 7中新引入的一种利用多处理器优势的框架。
Executor Interfaces
java.util.concurrent包定义了三种执行器接口:
- Executor,一个支持发起新任务的简单接口。
- ExecutorService,Executor的一个子接口,它增加了帮助生命期管理的功能,可以用在单个的任务和执行器自己之上。
- ScheduledExecutorService,是ExecutorService的子接口,支持未来and/or阶段性的任务执行。
执行器接口
执行器接口提供了一种简单的方法,execute,被设计成线程创造动作的一个简单代替。如果r是一个Runnable对象,那么e是一个可以替换的Executor对象,(new Thread(r)).start(); 可以替换成e.execute(r);。然而,execute的定义没有那么具体。底层次的做法是建立一个线程,然后立刻执行。依赖于Executor的实现,execute也会做同样的事,但是更有可能会使用一个现有的工人线程来运行r,或者是把r放进一个队列,等待工人线程可用。(我们将在线程池的章节讲到工人线程。)
java.util.concurrent包中的执行器实现是被设计成能充分利用更高级的ExecutorService和ScheduledExecutorService接口,尽管他们也是基于基本的Executor接口工作。
ExecutorService接口
ExecutorService接口使用一个更加全能的方法submit来支持execute方法。和execute方法一样,submit方法接受Runnable对象,但也接受Callable对象,Callable对象允许任务拥有一个返回值。submit方法会返回一个Future对象,它可以获取Callable的返回值,以及管理Callable和Runnable任务的状态。
ExecutorService也为提交大规模Callable集合对象提供了方法。最后,ExecutorService提供了一系列的方法来管理执行器的关闭。为了支持立刻关闭,任务需要正确处理中断。
ScheduledExecutorService接口
与它的父类ExecutorService相比,ScheduledExecutorService接口提供了schedule方法,该方法可以在一段时间的延迟后执行Runnable或Callable任务。此外,该接口定义了scheduleAtFixedRate方法和scheduleWithFixedDelay方法,他们可以以特定的间隔,反复执行特定的任务。
线程池
java.util.concurrent包中大部分的执行器实现使用了线程池。这些线程池和Runnable、Callable任务分别存在,并且总是被用来运行多任务。
使用工人线程最小程度地减少了由于线程创建带来的开销。新建线程会使用大量的内存,在大规模的应用中,分配和回收很多线程对象会带来巨大的内存管理开销。
一个常见的线程池类型是固定线程池。这种类型的线程池拥有指定数量的正在执行的线程;如果一个线程在运行时意外退出了,另一个新的线程会自动取代它。任务通过内部的队列分配给池子,队列负责保存多余的任务,当任务的数量超过线程数量时。
固定线程池的一个显著的优点是应用会非常小心地使用它。想要理解这一点,考虑一个网页服务器应用,其中的每个HTTP应用都被一个不同的线程处理。如果这个应用在每次获得新线程的时候都创建一个新的线程,那么假如这个系统忽然收到超出它能力的请求数量,它可能会忽然停止响应所有请求,因为这些操作的额外成本超出了系统的能力。
通过限制系统中能创建的线程数量,应用没办法像请求发送速度一样地处理请求,但它能够尽它所能地来服务请求。
创建固定线程池的一个简单方法是使用在java.util.concurrent包中的newFixedThreadPool工厂方法。这个类也提供了其他种类的工厂方法如下所示:
- newCachedThreadPool方法,会创建一个能够扩展线程池尺寸的执行器。这种执行器适合生命周期很短的应用。
- newSingleThreadExecutor方法,创建一次只会执行一个任务的执行器。
- 还有几个工厂方法是上面执行器的ScheduledExecutorService版本。
如果上面提供的方法没有能满足你的要求,构造java.util.concurrent.ThreadPoolExecutor或java.util.concurrent.ScheduledThreadPoolExecutor的实例会给你更多的选择。
Fork/Join
Fork/Join框架是ExecutorService接口的一个实现,它可以帮助你利用多处理器。它被设计成针对那些能够被递归分解为更小任务的工作。目的是最大限度地使用可提供地处理器能力来提高应用的性能。
和其他ExecutorService的实现一样,fork/join框架把任务分发给线程池中的若干工人线程。让Fork/Join与众不同的是,它使用了工作偷窃算法。做完了事情的工人线程会从其他繁忙线程中偷任务。
Fork/Join框架的核心类是ForkJoinPool,它是AbstractExecutorService类的扩展。ForkJoinPool实现了核心的工作偷窃算法,并且会执行ForkJoinTask进程。
基础用法
使用fork/join框架的第一步,是写能执行一部分工作的代码。你的代码会看上去和下面的很像:
if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results
把这些代码包裹在ForkJoinTask子类中,或是使用它的更为具体的一个子类,比如RecursiveTask或RecursiveAction。
等你的ForkJoinTask子类完成之后,创建一个代表所有工作的对象,然后把它传递给ForkJoinPool实例的invoke方法。
为了清晰而模糊
为了帮助你理解fork/join框架的工作原理,考虑下面的案例。假设你想要涂掉一幅画。原始的画素材由一个整数数组代表,其中每个整数都包含着一个像素的颜色值。混淆后的目标图也是由相同大小的整数数组代表。
实施模糊操作是由每次操作一个像素完成的。每个像素都被赋值为它周围像素的平均值,结果存放在结果数组中。因为这个图像是一个很大的数组,这个过程会花费很长的时间。你可以通过实现算法来使用fork/join,利用多核系统的并行处理能力。下面是一种可能的实现:
public class ForkBlur extends RecursiveAction {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;
// Processing window size; should be odd.
private int mBlurWidth = 15;
public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
}
protected void computeDirectly() {
int sidePixels = (mBlurWidth - 1) / 2;
for (int index = mStart; index < mStart + mLength; index++) {
// Calculate average.
float rt = 0, gt = 0, bt = 0;
for (int mi = -sidePixels; mi <= sidePixels; mi++) {
int mindex = Math.min(Math.max(mi + index, 0),
mSource.length - 1);
int pixel = mSource[mindex];
rt += (float)((pixel & 0x00ff0000) >> 16)
/ mBlurWidth;
gt += (float)((pixel & 0x0000ff00) >> 8)
/ mBlurWidth;
bt += (float)((pixel & 0x000000ff) >> 0)
/ mBlurWidth;
}
// Reassemble destination pixel.
int dpixel = (0xff000000 ) |
(((int)rt) << 16) |
(((int)gt) << 8) |
(((int)bt) << 0);
mDestination[index] = dpixel;
}
}
你现在实现了抽象的compute()方法,它要么直接进行模糊操作,要么把任务分成两份。一个简单的数组长度阈值决定了这个工作是直接运算,还是分割。
protected static int sThreshold = 100000;
protected void compute() {
if (mLength < sThreshold) {
computeDirectly();
return;
}
int split = mLength / 2;
invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
new ForkBlur(mSource, mStart + split, mLength - split,
mDestination));
}