一篇讲明白FeatureTask

2020-09-19  本文已影响0人  一个追寻者的故事

FeatureTask 我之前是真没见过,也没用过。不过不是吃饱撑了没事研究新的类,而是在 Android AsyncTask 的实现中使用到了 FeatureTaskAsyncTask 的实现基本上就是在 FeatureTask基础上套了个壳。所以想理解AsyncTask 必须先理解FeatureTask。那为什么不和AsyncTask一起讲,是因为FeatureTask 是 jdk concurrent 包中为了解决某一类问题(后面会讲)而设计的。另外拆开来讲,有助于更好的理解每块的功能。 好的,废话少说,系好安全带,准备发车了。


文章目录:

一、前言
二、正文

一、前言

正式开始之前,为了更好的理解FeatureTask,有必要先了解一下这两个类:LockSupportsun.misc.Unsafe。原因很简单,因为FeatureTask中使用了它们。

1.1、 LockSupport
public class LockSupport {
    // 当前线程放弃线程调度,直到获得许可。
    // 如果获得了许可,就会立刻返回。否则线程当前线程放弃线程调度,进入休眠状态。
    // 如下几种情况会被唤醒,从而继续执行:
    // 1.其它线程执行unpark唤醒当前线程
    // 2.其它线程执行 Thread#interrupt 打断当前线程。
    public static void park() 
    // 使线程获取许可,从而继续执行。如果之前线程时blocking,那么它将编程非blocking的。
    public static void unpark(Thread thread) 
}

功能类似 wait/notify,但是有一些区别:
1、park/unpark 不要求获取对象的锁。
2、park 不会释放线程持有的锁。
3、假如park 时线程处于blocking状态,Thread#interrupt之后不会抛出Exception

小例子:

class TestThread(name: String) : Thread(name) {
    override fun run() {
        println("$name: running")
        LockSupport.park()
        if (Thread.interrupted()) {
            println("$name: 被中断了")
        }
        println("$name: 继续执行了")

    }
}
fun main() {
    val t1 = TestThread("T1")
    val t2 = TestThread("T2")
    t1.start()
    t2.start()

    TimeUnit.SECONDS.sleep(5)
    //会使t1获得许可,从而继续执行。
    LockSupport.unpark(t1)      
    // t2被中断,t2处于blocking状态会被唤醒。
    t2.interrupt()

    t1.join()
    t2.join()
}

结果:

T1: running
T2: running
T1: 继续执行了
T2: 被中断了
T2: 继续执行了

如果先执行 unpark,再执行park,park执行时会认为获得了许可,立即返回。

1.2、 sun.misc.Unsafe

sun.misc.Unsafe 这个类名字 Unsafe - 不安全,它提供了一些可以直接绕过 jvm安全检查的一些机制(例如直接分配内存、回收内存),如果对它的实现不是特别清楚的话,用起来应该是危险的,因此仅开放给了JDK使用,当然非要用也不是不能,利用反射可以使用。下面找几个典型的方法:

public final class Unsafe {
// CAS 
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

// memory
public native long allocateMemory(long var1);
public native void freeMemory(long var1);

public native long objectFieldOffset(Field var1);
public native long staticFieldOffset(Field var1);
...
}

提供了CPU硬件指令级别的原子操作:compare - and - swap 的支持,我们熟悉的 Atomic 原子类的操作就是基于此实现的。
还有一些内存相关的操作:分配内存、释放内存
我们重点关注一下:objectFieldOffset用于获取非静态属性Field在对象实例中的偏移量,然后利用偏移量可以通过 CAS 更新对象实例中的属性,即可实现多线程同步机制,比使用 synchronized 加锁效率要高。使用 CAS 有一种场景:只让第一次修改对象实例中的属性生效,在多线程情况下可以使用synchronized来保证,也可以使用 CAS,我们来看一个例子:

class Bean {
    // state属性
    private volatile int state;  
    private static final int NEW          = 111;
    private static final int FINISHED     = 112;
    private static final int CANCELED     = 113;
    public Bean(){
        this.state = NEW;
    }
    public void doSomething(){
        U.compareAndSwapInt(this, STATE, NEW, FINISHED);
    }
    public void doSomething2(){
        U.compareAndSwapInt(this, STATE, NEW, CANCELED);
    }
    public int getState(){
        return state;
    }
    private static sun.misc.Unsafe U;
    private static final long STATE;       // state 的偏移地址
    static {
        try {
            // 利用反射获取 Unsafe实例
            Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            U = (sun.misc.Unsafe) f.get(null);
            
            // 获取 state 属性的偏移量
            STATE = U.objectFieldOffset(Bean.class.getDeclaredField("state"));
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }
    }
}

public static void main(String[] args) {
        Bean b = new Bean();
        b.doSomething();
        b.doSomething2();
        System.out.println("b#state: " + b.getState());
}

结果:

b#state: 112

上述例子中 doSomething 、 doSomething2 只有一个方法修改 state 属性生效,即便在多线程情况下,也能保证同步。

更多关于 sun.misc.Unsafe 的用法自行搜索

二、正文

2.1 FeatureTask 是什么,解决什么问题

我们知道线程的设计是基于 Thread-Runnable 模式的。 即没有办法直接从线程处理的异步任务中返回一个结果。因为Runnable的设计是这样的:

public interface Runnable {
    public abstract void run();
}

run方法没有参数、没有返回值。

FeatureTask 就是一个基于现有的 Thread-Runnable 模式,实现了一个可以获取异步任务返回值的机制。

FeatureTask类图

FeatureTask 是一个实现了 RunnableFeature 接口的 具体实现类。
RunnableFeature 接口 = Runnable 接口 + Feature接口 「没有多出一毛钱」

也就是说 FeatureTask 首先是一个 Runnable(无参、无返回值)
再看一下 Feature 的定义就知道了 FeatureTask 是怎么实现的 获取异步任务返回值的。

// V 代表了返回值类型
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    // 获取异步任务处理结果返回值,会一直阻塞调用者。直到异步任务处理结束,或者异步出现异常,又或者调用get()的线程被打断。
    V get() throws InterruptedException, ExecutionException;
    // 增加了超时机制
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Feature 就代表了 异步任务处理 返回结果的。 定义了任务是否已经完成、取消任务 和 获取任务返回值的行为。 即 Feature就是为了解决异步任务返回值而定义的

也即是说 FeatureTask即是一个普通的Runnable,可以被线程执行。又可以从它身上获取到异步任务处理的结果。

到此是不是有一瞬间的灵感,大概知道怎么使用 FeatureTask了。类似如下这种:

// 注意这是示意代码,不能真的运行。

val task = FeatureTask()    // 定义一个Runnable
val thread = Thread(task)  
thread.start()  // 开启一个线程
val result = task.get()  // 等待异步任务处理完成后给出返回结果,此步骤阻塞。

如上示意代码的整体逻辑是没问题的。 接下来就是我该如何把我的异步任务交给 FeatureTask 去执行呢(因为我们知道开启的线程肯定会去执行 FeatureTask#run ),难道在 FeatureTask 构造的时候再传一个 Runnable进去,此Runnable用来封装我们的异步任务代码片段?可是这个Runnable还是没有返回值呀,FeatureTask 也拿不到异步任务返回值,又怎么返回给我们呢。这里引入另一个类:Callable

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

CallableRunnable 比起来的区别是:1、有返回值类型 2、方法可以抛出异常。

没错。Callable 就是用来替代 Runnable 让我们封装 有返回值的 异步任务代码片段的。我们先看一个 FeatureTask使用的代码示例:

data class ResultData(var flag: String = "")

class MyCall : Callable<ResultData>{
    override fun call(): ResultData {
        println("this is myCall running, threadName: ${Thread.currentThread().name}")

        // do something wasting time

        return ResultData("callable")
    }
}

fun main() {
    val executors = Executors.newCachedThreadPool()
    val task = FutureTask<ResultData>(MyCall())
    executors.execute(task)
    // 获取异步任务返回值, maybe blocking
    val result = task.get()
    println("the call result is : $result")
}

结果:

this is myCall running, threadName: pool-1-thread-1
the call result is : ResultData(flag=callable)

至此,对FeatureTask的使用应该有了一个基本的认识。

2.2 FeatureTask 实现原理

原理这块基本上围绕着这个问题进行:
1、如何实现获取异步任务的结果。
2、如果异步任务执行过程中出现异常,会怎么处理。
3、如果在异步任务执行的过程中被取消了,会怎么处理。
4、get 超时机制如何实现

如果想要实现类似 FeatueTask的功能,上边几个问题是绕不开的。上边几个问题弄清楚了,原理也就清楚了。

2.2.1 如何实现获取异步任务的结果

首先FeatureTask 内部有五种状态:

内部状态的转化有一下几种:

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
// 代码仅包括核心的逻辑,完整逻辑看源码
public class FutureTask<V>{
    ...
    private Callable<V> callable;  // 最终运行的异步任务
    private Object outcome;  // 执行完成后用于返回上层的异步任务处理结果
    public FutureTask(Callable<V> callable)   //构造函数

    //非完整逻辑,状态判断等逻辑都删除了
    public void run() {
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // 异步任务执行出现了未捕获的异常
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);    // 异步任务正常执行完成
            }
        } finally {
           ...
        }
    }

    //报告异步任务执行成功
    protected void set(V v) {
        // 尝试修改内部状态
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = v;  //存入异步任务的结果
            U.putOrderedInt(this, STATE, NORMAL); // 最终的状态
            finishCompletion();
        }
    }
    // 报告异步任务执行出现异常
     protected void setException(Throwable t) {
        // 尝试修改内部状态
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = t;    // Exception 存入结果
            U.putOrderedInt(this, STATE, EXCEPTIONAL); // 最终状态
            finishCompletion();
        }
    }
    
    // 用户主动取消异步任务(不一定会成功,因为CAS保证了一旦状态从NEW改变了,就再也无法接受其它的情况了)
    public boolean cancel(boolean mayInterruptIfRunning) {
        // 如果起始状态不是NEW,则直接取消失败,直接返回。
        if (!(state == NEW &&
              U.compazreAndSwapInt(this, STATE, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;


        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    U.putOrderedInt(this, STATE, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

    private void finishCompletion() {
        ...
        done();
        callable = null;        // to reduce footprint
    }
     
    // 异步任务结束后的回调,无论是正常执行结束、还是异步任务出现了Exception、还是用户cancel掉了异步任务,最终都会执行done。这几种情况是互斥的,通过CAS保证了,只能有一种实现。
    protected void done() { }
    ...
}

上边的代码已经削减了很多,只留下了处理这个话题最核心的代码。

Callable异步任务的执行,最终依赖的是Runnable#run方法。即在 FeatureTask#run中调用Callable#call的调用,在子线程中完成了一次异步任务的执行。

异步任务的执行有几种情况:
1、正常处理完成 通过set方法报告结果
2、有未捕获的异常 通过setException方法报告结果
3、用户取消异步任务 通过cancel方法报告结果

上述几种情况的处理是基于CAS来实现原子操作,也即是说只有一种情况会最终执行。无论走哪一种情况,最终都会报告结果:finishCompletion,也就是说最终都会执行 done 方法, done 这相当于一个回调,即最后一步通知用户的回调。所以基于此,我们可以在done方法里去取异步任务处理的结果,好处是因为异步任务已经结束,最终结果的获取就不会blocking了。

改进一下上边的 FeatureTask的使用:

    val executors = Executors.newCachedThreadPool()
    val task = object : FutureTask<ResultData>(MyCall()){
        override fun done() {
            println("this is call done, threadName: ${Thread.currentThread().name}")
            val result = get()
            println("the call result is : $result")
        }
    }

    executors.execute(task)

执行结果:

this is myCall running, threadName: pool-1-thread-1
this is call done, threadName: pool-1-thread-1
the call result is : ResultData(flag=callable)

好了,下一步看一下,FeatureTask是以何种方式把异步任务处理结果报告给使用者呢,答案就在 get() 方法中:

  public V get() throws InterruptedException, ExecutionException {
        ...
        return report(s);
    }

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

我们看到了调用 ge() 方法的时候,如果异步任务处理正常(NORMAL),则直接返回异步任务的结果;如果用户取消了异步任务(CANCELLED、INTERRUPTED),则直接抛出异常 CancellationException ; 如果异步任务有未捕获的异常(EXCEPTIONAL),则会抛出 ExecutionException 异常。最终把主动权交给上层用户。

一个常见的处理模型:

try {
    get()
   // do something
} catch (InterruptedException e) {
    // do something                       
} catch (ExecutionException e) {                    
    // do something
} catch (CancellationException e) {          
    // do something         
}

至此整个流程就串起来了,细节的东西需要大家实际去看代码。

最后 get() 还可能会抛出 InterruptedException,这是处理哪一种情况呢?因为我们知道 get() 会阻塞调用者所在的线程,假如被其它线程执行 Thread#interrupt 打断了所阻塞的线程,get()方法就会抛出 InterruptedException,然后具体被打断后的处理逻辑交给用户去处理。

2.2.2 get(long timeout, TimeUnit unit) 超时机制如何实现

get(long timeout, TimeUnit unit)提供了超时机制,在指定时间内还没有返回的话,就会抛出异常 TimeoutException,然后把控制权交给使用者。『 和get()相比,会多抛出一个 TimeoutException

每一个FeatureTask 对象都维护了一个因调用此 FeatureTaskget 而进入等待状态的线程的单链表。元素的节点是:

    static final class WaitNode {
        volatile Thread thread;    // 当前节点的线程
        volatile WaitNode next;    // 下一个节点
        WaitNode() { thread = Thread.currentThread(); }
    }

如果get指定了超时时间,会使用 LockSupport#parkNanos将当前线程进入阻塞状态,超过时间后回去检查异步任务运行状态,如果异步任务还没有完成,则会抛出异常 TimeoutException

如果get 没指定超时时间,会使用LockSupport.park 无限期阻塞当前线程,直到任务处理完成调用 LockSupport.unpark 恢复运行。

我们知道 finishCompletion 无论如何都会调用,不管异步任务正常处理、还是有未捕获的异常,或者被用户取消了都会调用。之前讲解流程的时候,忽略了这个方法中关于 等待节点的处理。

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
       ...
    }

会遍历自己维护的 WaitNode 单链表,依次调用对应节点的 LockSupport.unpark(t); 让之前阻塞的线程都恢复运行。这么以来,所有阻塞的线程在这里都能得到异步任务的处理的结果。

对于超时机制的处理,这里讲的只是基本的处理。更细节的东西还是要参考代码去学习。

很有意思的一个事情是,和超时相关的话题这已经是第二次了。 第一次是讲 okio中的超时机制(这是一个典型的 生产者/消费者 模型的实践)。 第二次是讲 获取FeatureTask 异步任务处理结果的超时机制。 其实无非就是利用 wait/notify 或者 park/unpark 去处理多线程之间的关系。这两部分的超时机制设计的都很精彩,有时间建议大家去看看。

上一篇下一篇

猜你喜欢

热点阅读