jdkJava-多线程程序员

Java - Callable、Future、FutureTas

2021-01-02  本文已影响0人  夹胡碰

Callable、Future、FutureTask组合使用可以阻塞获取多线程执行的返回值,是Runnable功能的拓展。

1. 原生使用

public class CallAbleTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask futureTask = new FutureTask<>(new MyThread());
        Thread thread = new Thread(futureTask);
        thread.start();
        futureTask.run();
        Thread.sleep(2000);
        Object o = futureTask.get();
        System.out.println(o);
        Thread.sleep(3000);
        Object o2 = futureTask.get();
        System.out.println(o2);
    }

    public static class MyThread implements Callable{

        @Override
        public Object call() throws Exception {
            System.out.println("Thread start");
            Thread.sleep(3000);
            System.out.println("Thread end");
            return "返回值";
        }
    }
}
out =>
a
end
啊哈
啊哈

2. 线程池使用

ExecutorService executorService = Executors.newCachedThreadPool();
Future submit = executorService.submit(new MyThread());
Object o1 = submit.get();
System.out.println(o1);
out => 
Thread start
Thread end
返回值

3. 源码解析

Thread执行start方法后,异步执行FutureTask的run方法,result = c.call();阻塞执行,执行完成之后set(result);进行结果赋值和唤醒get()阻塞线程的工作

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call(); // 阻塞执行
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result); // 赋值result并唤醒get阻塞线程
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v; // 结果赋值
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) { // 遍历get阻塞等待队列
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t); // 唤醒get阻塞等待队列的线程
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L); // 阻塞,加入等待队列挂起
    return report(s); // 结果返回
}
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode(); // 创建当前线程的waitNode
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q); // cas处理,waitNode放在等待队列队首
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos); // 挂起线程
        }
        else
            LockSupport.park(this); // 挂起线程
    }
}
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}
上一篇下一篇

猜你喜欢

热点阅读