4-FutureTask

2020-02-11  本文已影响0人  鹏程1995

概述

引入

在上面讲CountDownLatch时我们介绍了几种方法实现线程的先后执行,这种存在是很必要的,很多时候我们在执行某些操作前需要预处理。但是很多时候我们不仅需要子线程做一些处理,主线程还需要子线程处理的结果进行后续操作。于是我们就有了Callable接口,他和Runnable接口一样,实现此接口的类可以被用来创建一个Thread并在其中被执行。我们为了方便的取回Callable计算出的值,我们又有了Future接口,专门负责协调主线程【一条或者多条线程,可以在多个线程中从一个子线程中同时取值】和子线程的执行,以达到主线程的等待效果。

我们本文介绍的FutureTask实现了上面的FutureCallable接口,实现了上面我们的需求:主线程获得子线程执行结果。

摘要

本文内容如下:

  1. 介绍了FutureTask的实现原理
  2. 介绍了FutureTask的用法
  3. FutureTask的工作过程中的具体机理进行了概述

类介绍

类定位

FutureTask实现了FutureCallable两个接口。它实现的功能罗列如下:

  1. 完成了以Runnable/Callable为入参,经过内部机制重新包装成自己的实例的统一化过程。【自己也实现了Runnable,可以被执行】
  2. 完成了协调主线程对子线程的等待的功能
  3. 完成了对多个等待线程的协调功能
  4. 完整了取消、查询状态等基础功能

注意

FutureTask实现的功能中并不包括启动一个子线程执行计算,需要根据项目环境依赖线程池或者自己创建一个子线程完成操作。

源码解读

Runnable/Callable的统一包装

数据保存

/** 要调用的逻辑,不管入参是 Callable 还是 Runnable 都最终封装成这个*/
private Callable<V> callable;

/**
  * callable 执行的结果,get()方法根据状态直接从这里取值返回。 
  * 这里保存的或者是 V 类型的返回值、或者是抛出的异常。
  * 在取用处理时要结合状态值
  **/
private Object outcome; 

/** 执行逻辑的子线程,在调用完成后将此值置空 */
private volatile Thread runner;

入参转化

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

其实即使入参是Runnable,转化也很简单,只是将result记录下来,在调用run()完成后即使返回即可。

注意

我们介绍的里面很容明白:

其中,因为我们初始化FutureTask是在主线程中,为了记录下runner,需要我们在FutureTask.run()方法中对runner进行赋值。毕竟执行run()方法的才是子线程,才能通过Thread.currentThread()获得目标线程并进行保存。

根据使用场景:

上面三个变量严格意义上都是只有一个线程来修改的,即使可能多个线程同时读取,但是是在我们修改结束后才读,所以不需要进行加锁和volatile标注。

协调等待

总览

我们在协调等待中主要依据两个主题介绍:一个是子线程的执行及结果赋值操作,一个是主线程的等待及取值操作。

子线程的执行及结果赋值主要的实现原理其实就是通过Thread.start()调用Runnable.run()方法执行其中的逻辑,FutureTask.run()callable进行了执行及对一些执行状态进行了封装、对一些执行结果进行了保存。执行结束后唤醒阻塞的栈中的线程让其进行取值。

主线程的等待及取值操作主要是对state进行检验,直到其指向结束返回,否则进栈阻塞等待。【考虑到多个线程取值,所以用的栈】

基础数据结构

/**
  * 任务的执行状态
  *
  * 使用 volatile 标注的原因:
  *     1. 等待的主线程和执行任务的子线程共享 state ,要对其进行不断的访问以进行状态,不能让编译器
  *         修改 state 相关操作顺序
  *     2. 修改 state 值使用了 Unsafe.putOrderedInt ,这个要求变量为 volatile 
  *
  * state 标识了任务执行的状态,在创建 FutureTask 时状态为 NEW ,在执行完成但是还没有对结果赋值
  * 保存时状态为 COMPLETING 。在有线程调用取消时,正在打断时状态为 INTERRUPTING ,其余的都是结束
  * 状态。状态流转如下:
  * NEW -> COMPLETING -> NORMAL
  * NEW -> COMPLETING -> EXCEPTIONAL
  * NEW -> CANCELLED
  * NEW -> INTERRUPTING -> INTERRUPTED
  **/
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

/** 调用了 get() 线程的阻塞的栈 */
private volatile WaitNode waiters;

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

子线程方面:执行

介绍

主要介绍FutureTaskrun()的实现,以及任务完成后的后续处理操作,如:结果保存、状态流转、主线程唤醒等等。

源码

任务逻辑操作
public void run() {
    // 如果
    //  状态不是初始状态【被提前取消了】
    //  runner 不是空,是不是有多个 Thread 借用这个实例并启动了,拒绝重复操作
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    // 调用 caller.call() 
    // 正常结束将结果保存
    // 抛出异常就将异常保存
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING) // 执行过程中调用了取消函数
            handlePossibleCancellationInterrupt(s);
    }
}
/**
  * 保存调用结果,并修改状态值
  *
  * 保存结束后调用 finishCompletion() 通知等待线程
  **/
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

/**
  * 保存调用抛出的 Exception,并修改状态值
  *
  * 保存结束后调用 finishCompletion() 通知等待线程
  **/
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

后续处理操作

/**
  * 唤醒栈中所有的等待线程
  *
  **/
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        // 直接把栈顶用一个临时变量记录下来然后去掉栈顶的引用
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t); // 不断唤醒栈中有意义的等待节点
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    // 线程结束后的额外操作,提供此方法以方便用户定制
    done();

    callable = null;        // 防止被重复调用子线程逻辑
}

/**
  *  子线程执行结束并唤醒所有等待线程后执行的方法
  *  专门用来方便开发者根据环境定制
  *
  **/
protected void done() { }

private void handlePossibleCancellationInterrupt(int s) {
        // 相应中断,放弃CPU轮转
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); 

        // 我们假定我们放弃时间片等待之后 state == INTERRUPTED;
}

主线程方面:等待及取值

介绍

主要介绍FutureTask提供的获得子线程操作结果的get()方法。

源码

/**
  * 检测状态,如果还未完成就调用 awaitDone()等待。如果是完成状态,直接调用 report() 返回结果
  *
  **/
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
/**
  * 检测状态,限时,没得说
  *
  **/
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

/**
  * 返回结果,根据状态看是应该返回 V 还是抛出取消的异常,还是抛出线程中自己抛出的异常
  **/
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

/**
  * 等待子线程完成任务。等待的思路很简单:
  * 1. 如果中断了或者有限时且超时了就停止等待
  * 2. 如果快完成了 COMPLETING ,就放弃时间片等一下再来检查
  * 3. 进队、阻塞,等待被唤醒
  *
  * 这里有个问题:
  * “中断了或者有限时且超时了就停止等待”会专门调用方法遍历链表并删除多余节点,以方便垃圾回收,中断
  * 超时可能会存在大量的废弃节点,在大型应用场景中如果不即使处理,会造成内存的浪费。
  * 
  * “进队、阻塞,等待被唤醒”,如果进队后还没来得及阻塞发现ok了,那不会对节点进行删除,原因很简单,
  *  因为状态ok了,马上要进行整个队列的所有节点的唤醒、销毁,这种工作就直接委托
  *  给了 finishCompletion() 方法。
  *
  *
  **/
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            // 其实就是一个 CAS 的进栈操作
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

/**
  * 删除废弃的节点
  **/
private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;
            }
            break;
        }
    }
}

取消及查询结果功能

很简单,直接上源码得了。

public boolean isCancelled() {
    return state >= CANCELLED;
}

public boolean isDone() {
    return state != NEW;
}

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

使用示例

自己创建子线程并使用

/**
 * @author lipengcheng3 Created date 2019-02-13 10:05
 */
public class FutureLearn {
    static class TestCallable implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            int sum=0;
            for (int i=0;i<10;i++){
                Thread.sleep(100);
                sum += i;
            }
            return sum;
        }
    }
    public static void main(String[] args){
        FutureTask<Integer> t1 = new FutureTask<>(new TestCallable());
        new Thread(t1).start();
        try {
            System.out.println(t1.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}


借助线程池

/**
 * @author lipengcheng3 Created date 2019-02-13 10:05
 */
public class FutureLearn1 {
    static class TestCallable implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            int sum=0;
            for (int i=0;i<10;i++){
                Thread.sleep(100);
                sum += i;
            }
            return sum;
        }
    }
    public static void main(String[] args){
        ExecutorService es = Executors.newCachedThreadPool();
        FutureTask<Integer> t1 = new FutureTask<>(new TestCallable());
        es.submit(t1);
        try {
            System.out.println(t1.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            es.shutdown();
        }
    }
}

总结

唯一的区别就是生成调用FutureTask的子线程的方法,数量少就自己new,数量多尽量用线程池,方便线程的充分利用,减少频繁进行线程创建回收引起的开销。

当然,你也可以自行继承FutureTask并对钩子done()进行实现以实现根据自己的调用环境定制FutureTask

问题

用了volatile,为什么呢?等后续解答。

扩展

参考文献

上一篇 下一篇

猜你喜欢

热点阅读