一篇讲明白FeatureTask
FeatureTask
我之前是真没见过,也没用过。不过不是吃饱撑了没事研究新的类,而是在 AndroidAsyncTask
的实现中使用到了FeatureTask
,AsyncTask
的实现基本上就是在FeatureTask
基础上套了个壳。所以想理解AsyncTask
必须先理解FeatureTask
。那为什么不和AsyncTask一起讲,是因为FeatureTask 是 jdk concurrent 包中为了解决某一类问题(后面会讲)而设计的。另外拆开来讲,有助于更好的理解每块的功能。 好的,废话少说,系好安全带,准备发车了。
文章目录:
一、前言
二、正文
一、前言
正式开始之前,为了更好的理解FeatureTask
,有必要先了解一下这两个类:LockSupport
和 sun.misc.Unsafe
。原因很简单,因为FeatureTask
中使用了它们。
1.1、 LockSupport
public class LockSupport {
// 当前线程放弃线程调度,直到获得许可。
// 如果获得了许可,就会立刻返回。否则线程当前线程放弃线程调度,进入休眠状态。
// 如下几种情况会被唤醒,从而继续执行:
// 1.其它线程执行unpark唤醒当前线程
// 2.其它线程执行 Thread#interrupt 打断当前线程。
public static void park()
// 使线程获取许可,从而继续执行。如果之前线程时blocking,那么它将编程非blocking的。
public static void unpark(Thread thread)
}
功能类似 wait/notify
,但是有一些区别:
1、park/unpark
不要求获取对象的锁。
2、park
不会释放线程持有的锁。
3、假如park
时线程处于blocking状态,Thread#interrupt之后不会抛出Exception
小例子:
class TestThread(name: String) : Thread(name) {
override fun run() {
println("$name: running")
LockSupport.park()
if (Thread.interrupted()) {
println("$name: 被中断了")
}
println("$name: 继续执行了")
}
}
fun main() {
val t1 = TestThread("T1")
val t2 = TestThread("T2")
t1.start()
t2.start()
TimeUnit.SECONDS.sleep(5)
//会使t1获得许可,从而继续执行。
LockSupport.unpark(t1)
// t2被中断,t2处于blocking状态会被唤醒。
t2.interrupt()
t1.join()
t2.join()
}
结果:
T1: running
T2: running
T1: 继续执行了
T2: 被中断了
T2: 继续执行了
如果先执行 unpark,再执行park,park执行时会认为获得了许可,立即返回。
1.2、 sun.misc.Unsafe
sun.misc.Unsafe 这个类名字 Unsafe
- 不安全,它提供了一些可以直接绕过 jvm安全检查的一些机制(例如直接分配内存、回收内存),如果对它的实现不是特别清楚的话,用起来应该是危险的,因此仅开放给了JDK使用,当然非要用也不是不能,利用反射可以使用。下面找几个典型的方法:
public final class Unsafe {
// CAS
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
// memory
public native long allocateMemory(long var1);
public native void freeMemory(long var1);
public native long objectFieldOffset(Field var1);
public native long staticFieldOffset(Field var1);
...
}
提供了CPU硬件指令级别的原子操作:compare - and - swap 的支持,我们熟悉的 Atomic 原子类的操作就是基于此实现的。
还有一些内存相关的操作:分配内存、释放内存
我们重点关注一下:objectFieldOffset
:用于获取非静态属性Field在对象实例中的偏移量,然后利用偏移量可以通过 CAS 更新对象实例中的属性,即可实现多线程同步机制,比使用 synchronized
加锁效率要高。使用 CAS 有一种场景:只让第一次修改对象实例中的属性生效,在多线程情况下可以使用synchronized来保证,也可以使用 CAS,我们来看一个例子:
class Bean {
// state属性
private volatile int state;
private static final int NEW = 111;
private static final int FINISHED = 112;
private static final int CANCELED = 113;
public Bean(){
this.state = NEW;
}
public void doSomething(){
U.compareAndSwapInt(this, STATE, NEW, FINISHED);
}
public void doSomething2(){
U.compareAndSwapInt(this, STATE, NEW, CANCELED);
}
public int getState(){
return state;
}
private static sun.misc.Unsafe U;
private static final long STATE; // state 的偏移地址
static {
try {
// 利用反射获取 Unsafe实例
Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
U = (sun.misc.Unsafe) f.get(null);
// 获取 state 属性的偏移量
STATE = U.objectFieldOffset(Bean.class.getDeclaredField("state"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
public static void main(String[] args) {
Bean b = new Bean();
b.doSomething();
b.doSomething2();
System.out.println("b#state: " + b.getState());
}
结果:
b#state: 112
上述例子中 doSomething 、 doSomething2 只有一个方法修改 state
属性生效,即便在多线程情况下,也能保证同步。
更多关于 sun.misc.Unsafe 的用法自行搜索
二、正文
2.1 FeatureTask 是什么,解决什么问题
我们知道线程的设计是基于 Thread-Runnable 模式的。 即没有办法直接从线程处理的异步任务中返回一个结果。因为Runnable的设计是这样的:
public interface Runnable {
public abstract void run();
}
run
方法没有参数、没有返回值。
FeatureTask 就是一个基于现有的 Thread-Runnable 模式,实现了一个可以获取异步任务返回值的机制。
FeatureTask类图FeatureTask 是一个实现了 RunnableFeature 接口的 具体实现类。
RunnableFeature 接口 = Runnable
接口 + Feature
接口 「没有多出一毛钱」
也就是说 FeatureTask
首先是一个 Runnable
(无参、无返回值)
再看一下 Feature 的定义就知道了 FeatureTask
是怎么实现的 获取异步任务返回值的。
// V 代表了返回值类型
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
// 获取异步任务处理结果返回值,会一直阻塞调用者。直到异步任务处理结束,或者异步出现异常,又或者调用get()的线程被打断。
V get() throws InterruptedException, ExecutionException;
// 增加了超时机制
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
Feature
就代表了 异步任务处理 返回结果的。 定义了任务是否已经完成、取消任务 和 获取任务返回值的行为。 即 Feature就是为了解决异步任务返回值而定义的
也即是说 FeatureTask
即是一个普通的Runnable,可以被线程执行。又可以从它身上获取到异步任务处理的结果。
到此是不是有一瞬间的灵感,大概知道怎么使用 FeatureTask
了。类似如下这种:
// 注意这是示意代码,不能真的运行。
val task = FeatureTask() // 定义一个Runnable
val thread = Thread(task)
thread.start() // 开启一个线程
val result = task.get() // 等待异步任务处理完成后给出返回结果,此步骤阻塞。
如上示意代码的整体逻辑是没问题的。 接下来就是我该如何把我的异步任务交给 FeatureTask
去执行呢(因为我们知道开启的线程肯定会去执行 FeatureTask#run ),难道在 FeatureTask
构造的时候再传一个 Runnable进去,此Runnable用来封装我们的异步任务代码片段?可是这个Runnable还是没有返回值呀,FeatureTask
也拿不到异步任务返回值,又怎么返回给我们呢。这里引入另一个类:Callable
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Callable
和 Runnable
比起来的区别是:1、有返回值类型 2、方法可以抛出异常。
没错。Callable
就是用来替代 Runnable
让我们封装 有返回值的 异步任务代码片段的。我们先看一个 FeatureTask
使用的代码示例:
data class ResultData(var flag: String = "")
class MyCall : Callable<ResultData>{
override fun call(): ResultData {
println("this is myCall running, threadName: ${Thread.currentThread().name}")
// do something wasting time
return ResultData("callable")
}
}
fun main() {
val executors = Executors.newCachedThreadPool()
val task = FutureTask<ResultData>(MyCall())
executors.execute(task)
// 获取异步任务返回值, maybe blocking
val result = task.get()
println("the call result is : $result")
}
结果:
this is myCall running, threadName: pool-1-thread-1
the call result is : ResultData(flag=callable)
至此,对FeatureTask
的使用应该有了一个基本的认识。
2.2 FeatureTask 实现原理
原理这块基本上围绕着这个问题进行:
1、如何实现获取异步任务的结果。
2、如果异步任务执行过程中出现异常,会怎么处理。
3、如果在异步任务执行的过程中被取消了,会怎么处理。
4、get 超时机制如何实现
如果想要实现类似 FeatueTask
的功能,上边几个问题是绕不开的。上边几个问题弄清楚了,原理也就清楚了。
2.2.1 如何实现获取异步任务的结果
首先FeatureTask
内部有五种状态:
- NEW 初始化状态
- COMPLETING 处理完异步任务等待结束的状态(短暂)
- NORMAL 正常处理完异步任务后的状态
- EXCEPTIONAL 异步任务异常后的状态
- CANCELLED 用户取消异步任务执行后的状态
- INTERRUPTING 异步任务被打断中的状态(短暂)
- INTERRUPTED 异步任务被打断后的状态
内部状态的转化有一下几种:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
// 代码仅包括核心的逻辑,完整逻辑看源码
public class FutureTask<V>{
...
private Callable<V> callable; // 最终运行的异步任务
private Object outcome; // 执行完成后用于返回上层的异步任务处理结果
public FutureTask(Callable<V> callable) //构造函数
//非完整逻辑,状态判断等逻辑都删除了
public void run() {
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
// 异步任务执行出现了未捕获的异常
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result); // 异步任务正常执行完成
}
} finally {
...
}
}
//报告异步任务执行成功
protected void set(V v) {
// 尝试修改内部状态
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v; //存入异步任务的结果
U.putOrderedInt(this, STATE, NORMAL); // 最终的状态
finishCompletion();
}
}
// 报告异步任务执行出现异常
protected void setException(Throwable t) {
// 尝试修改内部状态
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t; // Exception 存入结果
U.putOrderedInt(this, STATE, EXCEPTIONAL); // 最终状态
finishCompletion();
}
}
// 用户主动取消异步任务(不一定会成功,因为CAS保证了一旦状态从NEW改变了,就再也无法接受其它的情况了)
public boolean cancel(boolean mayInterruptIfRunning) {
// 如果起始状态不是NEW,则直接取消失败,直接返回。
if (!(state == NEW &&
U.compazreAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
private void finishCompletion() {
...
done();
callable = null; // to reduce footprint
}
// 异步任务结束后的回调,无论是正常执行结束、还是异步任务出现了Exception、还是用户cancel掉了异步任务,最终都会执行done。这几种情况是互斥的,通过CAS保证了,只能有一种实现。
protected void done() { }
...
}
上边的代码已经削减了很多,只留下了处理这个话题最核心的代码。
Callable
异步任务的执行,最终依赖的是Runnable#run方法。即在 FeatureTask#run
中调用Callable#call
的调用,在子线程中完成了一次异步任务的执行。
异步任务的执行有几种情况:
1、正常处理完成 通过set
方法报告结果
2、有未捕获的异常 通过setException
方法报告结果
3、用户取消异步任务 通过cancel
方法报告结果
上述几种情况的处理是基于CAS来实现原子操作,也即是说只有一种情况会最终执行。无论走哪一种情况,最终都会报告结果:finishCompletion
,也就是说最终都会执行 done
方法, done
这相当于一个回调,即最后一步通知用户的回调。所以基于此,我们可以在done方法里去取异步任务处理的结果,好处是因为异步任务已经结束,最终结果的获取就不会blocking了。
改进一下上边的 FeatureTask
的使用:
val executors = Executors.newCachedThreadPool()
val task = object : FutureTask<ResultData>(MyCall()){
override fun done() {
println("this is call done, threadName: ${Thread.currentThread().name}")
val result = get()
println("the call result is : $result")
}
}
executors.execute(task)
执行结果:
this is myCall running, threadName: pool-1-thread-1
this is call done, threadName: pool-1-thread-1
the call result is : ResultData(flag=callable)
好了,下一步看一下,FeatureTask
是以何种方式把异步任务处理结果报告给使用者呢,答案就在 get()
方法中:
public V get() throws InterruptedException, ExecutionException {
...
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
我们看到了调用 ge()
方法的时候,如果异步任务处理正常(NORMAL),则直接返回异步任务的结果;如果用户取消了异步任务(CANCELLED、INTERRUPTED),则直接抛出异常 CancellationException
; 如果异步任务有未捕获的异常(EXCEPTIONAL),则会抛出 ExecutionException
异常。最终把主动权交给上层用户。
一个常见的处理模型:
try {
get()
// do something
} catch (InterruptedException e) {
// do something
} catch (ExecutionException e) {
// do something
} catch (CancellationException e) {
// do something
}
至此整个流程就串起来了,细节的东西需要大家实际去看代码。
最后 get() 还可能会抛出 InterruptedException,这是处理哪一种情况呢?因为我们知道 get() 会阻塞调用者所在的线程,假如被其它线程执行 Thread#interrupt 打断了所阻塞的线程,get()方法就会抛出 InterruptedException,然后具体被打断后的处理逻辑交给用户去处理。
2.2.2 get(long timeout, TimeUnit unit) 超时机制如何实现
get(long timeout, TimeUnit unit)
提供了超时机制,在指定时间内还没有返回的话,就会抛出异常 TimeoutException
,然后把控制权交给使用者。『 和get()相比,会多抛出一个 TimeoutException
』
每一个FeatureTask
对象都维护了一个因调用此 FeatureTask
的 get
而进入等待状态的线程的单链表。元素的节点是:
static final class WaitNode {
volatile Thread thread; // 当前节点的线程
volatile WaitNode next; // 下一个节点
WaitNode() { thread = Thread.currentThread(); }
}
如果get指定了超时时间,会使用 LockSupport#parkNanos
将当前线程进入阻塞状态,超过时间后回去检查异步任务运行状态,如果异步任务还没有完成,则会抛出异常 TimeoutException
。
如果get 没指定超时时间,会使用LockSupport.park
无限期阻塞当前线程,直到任务处理完成调用 LockSupport.unpark
恢复运行。
我们知道 finishCompletion
无论如何都会调用,不管异步任务正常处理、还是有未捕获的异常,或者被用户取消了都会调用。之前讲解流程的时候,忽略了这个方法中关于 等待节点的处理。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
...
}
会遍历自己维护的 WaitNode
单链表,依次调用对应节点的 LockSupport.unpark(t);
让之前阻塞的线程都恢复运行。这么以来,所有阻塞的线程在这里都能得到异步任务的处理的结果。
对于超时机制的处理,这里讲的只是基本的处理。更细节的东西还是要参考代码去学习。
很有意思的一个事情是,和超时相关的话题这已经是第二次了。 第一次是讲
okio
中的超时机制(这是一个典型的 生产者/消费者 模型的实践)。 第二次是讲 获取FeatureTask
异步任务处理结果的超时机制。 其实无非就是利用wait/notify
或者park/unpark
去处理多线程之间的关系。这两部分的超时机制设计的都很精彩,有时间建议大家去看看。