JAVA-Future

2019-10-30  本文已影响0人  AlanSun2

Future 表示异步计算的结果。提供了一些方法来检查计算是否完成,等待其完成以及检索计算结果。只有在计算完成后才可以使用get方法检索结果,必要时将其阻塞,直到准备就绪为止。取消通过 cancel 方法执行。提供了其他方法来确定任务是正常完成还是被取消。一旦计算完成,就不能取消计算。如果出于可取消性的目的而使用 Future 而不提供可用的结果,则可以声明 Future<?> 形式的类型,并作为基础任务的结果返回null。

示例:

 interface ArchiveSearcher { String search(String target); }
 class App {
   ExecutorService executor = ...
   ArchiveSearcher searcher = ...
   void showSearch(final String target)
       throws InterruptedException {
     Future<String> future
       = executor.submit(new Callable<String>() {
         public String call() {
             return searcher.search(target);
         }});
     displayOtherThings(); // do other things while searching
     try {
       displayText(future.get()); // use future
     } catch (ExecutionException ex) { cleanup(); return; }
   }
 }

FutureTask 类是Future的实现,该实现类实现 Runnable 接口,因此可以由执行程序执行。例如,上面带有 Submit 的构造可以替换为:

 FutureTask<String> future =
   new FutureTask<String>(new Callable<String>() {
     public String call() {
       return searcher.search(target);
   }});
 executor.execute(future);

内存一致性影响:异步计算采取的操作发生在另一个线程中相应的 Future.get() 之后的操作之前。

接口结构

public interface Future<V> {
    //尝试取消执行此任务。如果任务已经完成,已经被取消或由于某些其他原因而无法取消,则此尝试将失败。
    //如果成功,并且在调用 cancel 时此任务尚未开始,则该任务永远无法运行。
    //如果任务已经开始,则 mayInterruptIfRunning 参数确定是否应中断执行该任务的线程以尝试停止该任务。
    //mayInterruptIfRunning == true, 表示中断执行中的线程,false 表示让线程正常完成
    boolean cancel(boolean mayInterruptIfRunning);
    //如果此任务在正常完成之前被取消,则返回true。
    boolean isCancelled();
    //如果此任务完成,则返回true。完成可能是由于正常终止,异常或取消引起的,在所有这些情况下,此方法都将返回true。
    boolean isDone();
    //必要时等待计算完成,然后检索其结果
    V get() throws InterruptedException, ExecutionException;
    //必要时最多等待给定时间以完成计算,然后检索其结果(如果有)。
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future 有很多实现类,这里主要讲下他的一个实现类 FutureTask。

实现类 FutureTask

一个可取消的异步计算。此类提供 Future 的基本实现,其中包含启动和取消计算,查询以查看计算是否完成以及检索计算结果的方法。只有在计算完成后才能检索结果;如果计算尚未完成,则get方法将阻塞。一旦计算完成,就不能重新启动或取消计算(除非使用 runAndReset() 方法调用计算)。

FutureTask 可用于包装 Callable 或 Runnable 对象。由于 FutureTask 实现 Runnable,因此 FutureTask 可以提交给执行程序以执行。

除了用作独立类之外,此类还提供受保护的功能,这些功能在创建自定义任务类时可能很有用。

类图:


FutureTask.png

可以看到 FutureTask 实现了 RunableFuture,RunableFuture 顾名思义就是可执行的 Future,它是 Runable 和 Future 的结合。RunableFuture 重写了 Runable 的 run 方法,它的其中一个作用就是把执行结果放入返回结果中,具体到 FutureTask 的话就是把 Callable (Runable 也会被包装成 Callable)的返回值由 run -> set(result) 调用路径放入 outcome 成员变量中。

一般的执行过程:

  1. 主线程调用 submit 执行 FutureTask,FutureTask 执行中
  2. 主线程调用 get 方法,判断是否已执行完。如果已执行完,则直接获取结果。如果未执行完,则主线程阻塞直到 run 方法执行完成后去唤醒主线程。其中阻塞和唤醒用的是 LockSupport 的方法。

FutureTask 有一个成员变量 waiters,类型是 WaitNode,是一个静态内部类,它表示等待获取结果的线程链表,它是一个从尾部开始的单向链表,主要作用是存储等待获取结果的线程,在完成时唤醒这些线程:

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

重点方法解析

1. run

run 方法实现自 Runable 接口,所以当线程启动的时候会首先运行这个方法来。

public void run() {
    // 1. 如果 state !=  NEW 说明 run 方法已经运行过,直接 return
    // 2. 如果 state == NEW && CAS 竞争 设置 runner 失败,说明已经有别的线程在运行,直接 return
    // NEW 的状态由构造方法初始化,runner 是运行该 Callable 的线程
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;// 表示运行中是否出现异常
            try {
                result = c.call();// 调用实际自己实现的程序
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);// 出现异常,调用 setException
            }
            // 中间出现异常,则掉过 set
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        // 如果最终状态 >= INTERRUPTING,则处理中断
        // cancel 方法会通过参数 mayInterruptIfRunning 来设置 state 的值
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

可以看出,润 run 方法只能运行一次。

1.1 setException

运行时出现异常。

protected void setException(Throwable t) {
    // 这里为什么要用 CAS 因为可能会和 cancel 方法产生竞争。
    // 如果竞争失败,说明取消竞争成功,在 cancel 方法承担唤醒的工作,所以直接跳过。
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 竞争成功
        outcome = t; // outcome 为一个 Throwable 
        // 把最终状态改为 EXCEPTIONAL
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
1.2. finishCompletion
//删除并发出所有等待线程的信号,调用done(),并取消进行中的方法
private void finishCompletion() {
    // assert state > COMPLETING;
    // 从 waiters 末尾开始遍历,for 自旋直到 CAS 成功。
    for (WaitNode q; (q = waiters) != null;) {
        // 使用 CAS 把 waiters 设置为 null,和 awaitDone 和 removeWatier 方法竞争
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 自旋唤醒所有线程
            for (;;) {
                Thread t = q.thread;
                // 唤醒线程
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    // 这个一个空方法,主要用于继承重写,这也是模板模式的体现
    done();
    callable = null;        // to reduce footprint
}

done 是一个模板方法,之前的一篇文章中讲到的 ExecutorCompletionService 有用到它。

1.3. handlePossibleCancellationInterrupt

它的作用就是当接收来自 cancel 的中断时,确保 task 还在运行,具体为什么我也不太清除。

/**
 * Ensures that any interrupt from a possible cancel(true) is only
 * delivered to a task while in run or runAndReset.
 */
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
    // assert state == INTERRUPTED;
    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

2. runAndReset

顾名思义这个方法有两个操作 run 和 reset。它有一个 boolean 的返回值,只有在两个操作都成功的情况下才会返回 true,如果返回了 false ,有一种情况是它还是会执行了一次 task。

runAndReset 和 run 方法最大的区别是它不处理结果,也没有唤醒等待中的线程的操作。所以使用 runAndReset 的 task 最好没有返回值。还有就是使用了 runAndReset 就不要用 get 方法了(此时 get 的作用只剩下阻塞这个功能)。

一旦 cancel 就不能在次运行了。它被设计出来的用意就是执行多次的任务。

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

3. get 和 get(time)

get 和 get(time) 的差别只是增加了等待时间。在代码层面,这两者几乎一样。

 public V get() throws InterruptedException, ExecutionException {
     int s = state;
     if (s <= COMPLETING)
         s = awaitDone(false, 0L);
     return report(s);
 }

 public V get(long timeout, TimeUnit unit)
     throws InterruptedException, ExecutionException, TimeoutException {
     if (unit == null)
         throw new NullPointerException();
     int s = state;
     if (s <= COMPLETING &&
         (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
         throw new TimeoutException();
     return report(s);
 }

// 根据 awaitDone 返回状态返回结果或抛出异常
private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 顺利运行
    if (s == NORMAL)
        return (V)x;
    // 取消
    if (s >= CANCELLED)
        throw new CancellationException();
    // task 执行过程中出现异常
    throw new ExecutionException((Throwable)x);
}

可以看到几乎一模一样。最重要的方法是 awaitDone 方法。awaitDone 的作用就是把获取结果的线程放入等待链表,然后阻塞,直至被中断,计算完成或出现异常。

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;//用于计时
    WaitNode q = null;
    boolean queued = false;
    for (;;) {//自旋
        //如果已经被中断,则 removeWaiter,抛出中断异常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
      
        int s = state;//获取该任务的运行状态,state 有7个值
        // s > COMPLETING,说明 task 已经结束
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 马上就要结束,则让出cpu
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 初始化 WaitNode
        else if (q == null)
            q = new WaitNode();
        // 是否已入队,没有则把 WaitNode 接到末尾
        else if (!queued)
            // 和 finishCompletion 和 removeWaiter 竞争
            // 1. finishCompletion  竞争成功,说明 state 已经 > COMPLETING 则下次循环就会退出
            // 2. removeWaiter  竞争成功,说明 waiters 变化了,下一次循环再次竞争
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
        // 如果使用了计时,则判断是否超时,如果超时则以出 WaitNode 并立即返回无需等待结果,否则阻塞 nanos
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            //阻塞,直到被唤醒(正常完成 || 异常 || 中断)
            LockSupport.park(this);
    }
}

其中 state 有7个值,仅在set,setException和cancel方法中转换为中止状态。在完成时,它是一个瞬时值 COMPLETING(在 outcome 设置时)或 INTERRUPTING (中断时)。从这些中间状态到最终状态的转换使用便宜的有序/惰性写入,因为值是唯一的,并且无法进一步修改。分别是:

state 描述
1. NEW = 0 构造方法创建时
2. COMPLETING = 1 这是一个中间态,完成时和出现异常时有使用到
3. NORMAL = 2 完成运行时的最终状态
4. EXCEPTIONAL = 3 异常时的最终状态
5. CANCELLED = 4 已取消
6. INTERRUPTING = 5 中断中
7. INTERRUPTED = 6 已中断

可能的 state 转换:

  1. NEW -> COMPLETING -> NORMAL
  2. NEW -> COMPLETING -> EXCEPTIONAL
  3. NEW -> CANCELLED
  4. NEW -> INTERRUPTING -> INTERRUPTED
3.1. removeWaiter
// 半机翻:尝试取消链接超时或中断的等待节点以避免堆积垃圾。 内部节点的拼接
// 没有CAS,因为这对释放者无论如何遍历都没有影响。 为了避免因未拼接而造成的影响
// 删除的节点,如果出现明显的情况,则遍历该列表根。 当节点很多时,这很慢,但是我们没有
// 期望列表足够长以超过更高的开销
计划。
private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            //遍历整个链表
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                // q.thread != null,则把 q 当作前一个节点,遍历下一个节点
                if (q.thread != null)
                    pred = q;
                // q.thread == null && pred != null,表示当前节点不是第一个节点,是一个中间节点
                // 这里没有使用 CAS,如果出现多个线程同时遍历,前一个节点变为null,则重新从头遍历
                // 为什么没有使用 CAS 因为作者的想法时这个链表不会太长,所以我们使用时不应该使这个量表太长
                // 操作:把下一个节点连接到前一个节点的后面
                else if (pred != null) {
                    pred.next = s;// 把 s 连接到 pred 后面
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                // q.thread == null && pred == null,表示第一个节点的 thread == null,
                // 这里使用 CAS,因为可能多个线程在操作
                // 操作:把下一个节点设置为末尾节点,如果竞争失败则重新从头遍历
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                    continue retry;
            }
            break;
        }
    }
}

这个方法告诉我们使用这个链表时,不应该使得链表太长。

既然前面说到 Future 是要给可取消的计算,我们接下来就来看下 cancel 方法:

4. cancel

中断运行中(state == NEW)的线程,但是真正的中断还是要配合你的 callable 的实现,就是是否对中断信号进行了检测并作出相应的处理

public boolean cancel(boolean mayInterruptIfRunning) { 
    // state != NEW 或 state == NEW CAS 竞争失败就直接返回 false 取消失败,说明线程已经成功运行或出现了异常
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                // runner 在 run 方法的最后会被置为 null
                if (t != null)
                    t.interrupt();// 发出中断信号
            } finally { // final state
                
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 唤醒线程,请看1.2节
        finishCompletion();
    }
    return true;
}

好了 Future 到这里就讲完了,如果有错误的请指正。

上一篇下一篇

猜你喜欢

热点阅读