技术栈

2019-04-01——Java并发包 Future

2019-04-01  本文已影响0人  烟雨乱平生

Future体系类图

Future框架.png

Future

Future保存异步计算的结果,该接口提供以下的方法

方法 描述
cancel(boolean mayInterruptIfRunning) 试图取消执行的任务,参数为true时直接中断正在执行的任务,否则直到当前任务执行完成,成功取消后返回true,否则返回false
isCancel() 判断任务是否在正常执行完前被取消的,如果是则返回true
isDone() 判断任务是否已完成
get() 等待计算结果的返回(阻塞方法),如果计算被取消了则抛出异常
get(long timeout,TimeUtil unit) 设定计算结果的返回时间,如果在规定时间内没有返回计算结果则抛出异常

RunnableFuture

RunnableFuture接口继承了Future和Runnable两个接口

FutureTask

FutureTask实现了RunnableFuture接口,即FutureTask对象既是一个Future接口的实例又是一个Runnable接口的实例。

状态 说明
NEW 表示这是一个新的任务,或者还没有执行完的任务,是初始状态。
COMPLETING 表示任务执行结束(正常执行结束,或者发生异常结束),但是还没有将结果保存到outcome中。是一个中间状态。
NORMAL 示任务正常执行结束,并且已经把执行结果保存到outcome字段中。是一个最终状态。
EXCEPTIONAL 表示任务发生异常结束,异常信息已经保存到outcome中,这是一个最终状态。
CANCELLED 任务在新建之后,执行结束之前被取消了,但是不要求中断正在执行的线程,也就是调用了cancel(false),任务就是CANCELLED状态
INTERRUPTING 任务在新建之后,执行结束之前被取消了,并要求中断线程的执行,也就是调用了cancel(true),这时任务状态就是INTERRUPTING
INTERRUPTED 调用cancel(true)取消异步任务,会调用interrupt()中断线程的执行,然后状态会从INTERRUPTING变到INTERRUPTED。
变化流程 说明
NEW -> COMPLETING -> NORMAL 正常执行结束的流程
NEW -> COMPLETING -> EXCEPTIONAL 执行过程中出现异常的流程
NEW -> CANCELLED 被取消,即调用了cancel(false)
NEW -> INTERRUPTING -> INTERRUPTED 被中断,即调用了cancel(true)
构造函数 说明
FutureTask(Callable<V> callable) 传入一个Callable对象,可以通过get方法获取到结果
FutureTask(Runnable runnable, V result) 传入一个Runnable对象和一个返回的结果,该Runnable对象会通过Executors.callable(runnable, result)方法封装成Callable对象;result作为get方法返回的结果
成员对象 说明
Callable<V> callable 一个Callable类型的变量,封装了计算任务,可获取计算结果。
Object outcome 用来保存计算任务的返回结果,或者执行过程中抛出的异常。
volatile Thread runner 指向当前在运行Callable任务的线程
volatile WaitNode waiters WaitNode是FutureTask的内部类,表示一个阻塞队列,如果任务还没有执行结束,那么调用get()获取结果的线程会阻塞,在这个阻塞队列中排队等待。
public void run() {
    /*1.判断状态是否是NEW,不是NEW,说明任务已经被其他线程执行,甚至执行结束,或者被取消了,直接返回*/
    /*2.调用CAS方法,判断runnerOffset为null的话,就将当前线程保存到runnerOffset中(即把当前的线程赋值给runner),如果设置runnerOffset失败,就直接返回*/
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                /*执行Callable任务,结果保存到result中*/
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                /*如果执行任务过程中发生异常,将调用setException()设置异常*/
                result = null;
                ran = false;
                setException(ex);
            }
            /*任务正常执行结束调用set(result)保存结果*/
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        /*任务执行结束,runner设置为null,表示当前没有线程在执行这个任务了*/
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        /*读取状态,判断是否在执行的过程中,被中断了,如果被中断,处理中断*/
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
protected void setException(Throwable t) {
    /*通过CAS操作将当前线程的stateOffset(即state)从NEW置为COMPLETING*/
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        /*将异常信息保存到outcome*/
        outcome = t;
        /*将state改成EXCEPTIONAL*/
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
protected void set(V v) {
    /*将当前任务的状态从NEW改为COMPLETING*/
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        /*保存执行的结果*/
        outcome = v;
        /*将当前任务的状态改成NORMAL*/
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

一般情况下,执行任务的线程和获取结果的线程不会是同一个,当我们在主线程或者其他线程中,获取计算任务的结果时,就会调用get方法,如果这时计算任务还没有执行完成,调用get()的线程就会阻塞等待。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    /*如果任务还未执行完成(任务状态为NEW,从run方法中可以看到只有任务执行结束,或者发生异常的时候,state才会被设置成COMPLETING)*/
    if (s <= COMPLETING)
        /*调用awaitDone(false, 0L),进入阻塞状态*/
        s = awaitDone(false, 0L);
    /*返回结果*/
    return report(s);
}
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    /*自驱*/
    for (;;) {
        /*如果调用线程被阻断了就从等待的线程栈中移除这个等待节点,然后抛出中断异常*/
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        /*获取当前任务的状态*/
        int s = state;
        /*如果当前任务已经执行完成(正常或异常结束),不在阻塞,直接返回任务状态*/
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        /*如果任务结束,但是最终结果还没保存下来,可以暂时让出CPU*/
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        /*如果等待节点q=null,就创建一个等待节点*/
        else if (q == null)
            q = new WaitNode();
        /*如果这个等待节点还没有加入等待队列,就加入队列头*/
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                    q.next = waiters, q);
        /*如果设置了超时等待时间*/
        else if (timed) {
            nanos = deadline - System.nanoTime();
            /*如果超出了等待时间,停止阻塞,返回当前任务的状态*/
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            /*阻塞特定时间*/
            LockSupport.parkNanos(this, nanos);
        }
        /*阻塞等待唤醒*/
        else
            LockSupport.park(this);
    }
}
private V report(int s) throws ExecutionException {
    /*获取当前任务的结果*/
    Object x = outcome;
    /*如果任务的状态是正常完成,则返回结果*/
    if (s == NORMAL)
        return (V)x;
    /*如果任务被取消了,则抛出取消异常*/
    if (s >= CANCELLED)
        throw new CancellationException();
    /*否则异常结束,outcome里面保存的是异常结果,将异常抛出*/
    throw new ExecutionException((Throwable)x);
}
public boolean cancel(boolean mayInterruptIfRunning) {
    /*1.判断当前任务的状态是否为新建状态,如果不是说明任务已经正常或异常结束了,直接返回取消失败*/
    /*2.如果任务是NEW状态,根据mayInterruptIfRunning尝试将任务状态改成CANCELLED或者INTERRUPTING,更改失败返回false*/
    if (!(state == NEW &&
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        /*如果取消的时候尝试打断任务的执行*/
        if (mayInterruptIfRunning) {
            try {
                /*获取执行当前任务的线程*/
                Thread t = runner;
                if (t != null)
                    /*调用interrupt方法打断线程*/
                    t.interrupt();
            } finally { // final state
                /*将任务的状态改为INTERRUPTED*/
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
  • finishCompletion
    这个方法在run方法和cancel方法中都有调用,分析代码可以知道这个方法是在任务的状态变成终态的时候会被调用。该方法主要做了三件事:
  • 遍历waiters等待队列,调用LockSupport.unpark(t)唤醒等待返回结果的线程,释放资源。
  • 调用done(),这个方法什么都没有做,不过子类可以实现这个方法,做一些额外的操作。
  • 设置callable为null,callable是FutureTask封装的任务,任务执行完,释放资源。

上一篇 下一篇

猜你喜欢

热点阅读