FutureTask源码走读

2017-04-03  本文已影响18人  忘净空

FutureTask类图

从类图可以看出FutureTask简介实现了Runnable、Future接口,我们知道Future用于表示异步计算的结果,所以通过它我们可以获得线程的执行结果。如何获得线程的执行结果呢?

FutureTask获取执行结果

一般我们按照下面的方式使用FutureTask

//1. 创建FutureTask
FutureTask futureTask = new FutureTask(new CallableTest());
//2. 启动线程
new Thread(futureTask).start();
//3 获取线程执行结果
String result = (String) futureTask.get();

源码走读

//创建FutureTask对象
public FutureTask(Callable<V> callable) {
if (callable == null)
    throw new NullPointerException();
this.callable = callable;
this.state = NEW;       // ensure visibility of callable
}

//调用run方法,执行我们的业务逻辑
public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //我们重写的call的方法的执行
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)//call方法执行完成执行唤醒操作
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

//执行call方法的时候,我们主线程接着执行get()方法
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //阻塞主方法的逻辑
        s = awaitDone(false, 0L);
    return report(s);
}

//未被唤醒会一直等待直到超时或知道被中断
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    //等待队列
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            //阻塞操作(有超时时间)
            LockSupport.parkNanos(this, nanos);
        }
        else
            //阻塞操作
            LockSupport.park(this);
    }
}

//run()中的唤醒操作
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        //执行后结果赋值给outcome
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

//依次唤醒阻塞的线程
private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; 
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;
    }

从上面的分析可以看出:异步计算线程启动后,主线程(或者其他调用get()方法的线程)将被放在一个等待对列中,同时被阻塞(通过LockSupport类的park方法),知道异步计算线程执行完成后,等待队列中的线程将被依次唤醒,并且或得计算结果。

上一篇下一篇

猜你喜欢

热点阅读