Java高级交流

Java 并发编程——Callable+Future+Futur

2018-05-08  本文已影响93人  Java小生

项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable实现。

Callable接口                                                                                                    

Callable接口Runable接口可谓是兄弟关系,只不过Callable是带返回值的。 

public interface Callable<V> {

    /**

     * Computes a result, or throws an exception if unable to do so.

     *

     * @return computed result

     * @throws Exception if unable to compute a result

     */

    V call() throws Exception;

}

Future 接口                                                                                                       

接口函数及含义 :public interface Future<V>

 boolean  cancel(boolean mayInterruptIfRunning)

取消当前执行的任务,如果已经执行完毕或者已经被取消/由于某种原因不能被取消 则取消任务失败。

参数mayInterruptIfRunning: 当任务正在执行,如果参数为true ,则尝试中断任务,否则让任务继续执行知道结束。

 

boolean isCancelled()

Returns {@code true} if this task was cancelled before it completed

* normally.

boolean isDone();

/**

 * Returns {@code true} if this task completed.

 *

 * Completion may be due to normal termination, an exception, or

 * cancellation -- in all of these cases, this method will return

 * {@code true}.

 *

 * @return {@code true} if this task completed

 */

V get() throws InterruptedException, ExecutionException;

/**

 * Waits if necessary for the computation to complete, and then

 * retrieves its result.

 *

 * @return the computed result

 * @throws CancellationException if the computation was cancelled

 * @throws ExecutionException if the computation threw an

 * exception

 * @throws InterruptedException if the current thread was interrupted

 * while waiting

 */

由注释可以看出,当没有执行完成时,需要等待任务执行完成了才会将计算结果返回。

V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

Waits if necessary for at most the given time for the computation

* to complete, and then retrieves its result, if available.

如果等待的时间超过设置的时间则会报 TimeoutException异常

FutureTask                                                                                                     

public class FutureTask<V> implements RunnableFuture<V>

由定义可以看出它实现了RunnableFuture接口,那么这个接口又是什么呢?看下面的接口定义,其实很简单  

public interface RunnableFuture<V> extends Runnable, Future<V> {

    /**

     * Sets this Future to the result of its computation

     * unless it has been cancelled.

     */

    void run();

}

再回到FutureTask,它其实就是实现了Runnable和Future接口,FutureTask的执行是 状态转换的过程,源码中有七种状态如下:

  * Possible state transitions:

     * NEW -> COMPLETING -> NORMAL

     * NEW -> COMPLETING -> EXCEPTIONAL

     * NEW -> CANCELLED

     * NEW -> INTERRUPTING -> INTERRUPTED

     */

    private volatile int state;

    private static final int NEW          = 0;

    private static final int COMPLETING   = 1;

    private static final int NORMAL       = 2;

    private static final int EXCEPTIONAL  = 3;

    private static final int CANCELLED    = 4;

    private static final int INTERRUPTING = 5;

    private static final int INTERRUPTED  = 6;

当FutureTask刚刚被创建时,它的状态是NEW,其它状态查看源码。

其它成员变量:

 /** The underlying callable; nulled out after running */

    private Callable<V> callable;

    /** The result to return or exception to throw from get() */

    private Object outcome; // non-volatile, protected by state reads/writes

    /** The thread running the callable; CASed during run() */

    private volatile Thread runner;

    /** Treiber stack of waiting threads */

    private volatile WaitNode waiters;

callable是待执行的任务,FutureTask 的 run()函数中执行callable中的任务。

outcome : 是callable的执行结果,当正常执行完成后会将结果set到outcome中

runner:是执行callable 的线程

WaitNode : 是的受阻塞的线程链表,当cancel一个任务后,阻塞的线程会被唤醒。


构造函数:

public FutureTask(Callable<V> callable) {

        if (callable == null)

            throw new NullPointerException();

        this.callable = callable;

        this.state = NEW;       // ensure visibility of callable

    }

public FutureTask(Runnable runnable, V result) {

    this.callable = Executors.callable(runnable, result);

    this.state = NEW;       // ensure visibility of callable

}

从构造函数可以看出,不光可以通过callable构造FutureTask还可以通过Runnable接口转化为callable来构造。关键函数为黄色标记部分,Executors中的实现源码如下:

/**

     * A callable that runs given task and returns given result.

     */

    private static final class RunnableAdapter<T> implements Callable<T> {

        private final Runnable task;

        private final T result;

        RunnableAdapter(Runnable task, T result) {

            this.task = task;

            this.result = result;

        }

        public T call() {

            task.run();

            return result;

        }

    }

这里面不懂result到底有什么意义,明明就是预先设置好的。

其它具体的方法说明这里不再细说,里面用到了很多sun.misc.Unsafe中的方法以及其他SDK底层接口,后续有时间再学习。下面贴出了整个源码及说明

public class FutureTask<V> implements RunnableFuture<V> {

    /*

     * Revision notes: This differs from previous versions of this

     * class that relied on AbstractQueuedSynchronizer, mainly to

     * avoid surprising users about retaining interrupt status during

     * cancellation races. Sync control in the current design relies

     * on a "state" field updated via CAS to track completion, along

     * with a simple Treiber stack to hold waiting threads.

     *

     * Style note: As usual, we bypass overhead of using

     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.

     */

    /**

     * The run state of this task, initially NEW.  The run state

     * transitions to a terminal state only in methods set,

     * setException, and cancel.  During completion, state may take on

     * transient values of COMPLETING (while outcome is being set) or

     * INTERRUPTING (only while interrupting the runner to satisfy a

     * cancel(true)). Transitions from these intermediate to final

     * states use cheaper ordered/lazy writes because values are unique

     * and cannot be further modified.

     *

     * Possible state transitions:

     * NEW -> COMPLETING -> NORMAL

     * NEW -> COMPLETING -> EXCEPTIONAL

     * NEW -> CANCELLED

     * NEW -> INTERRUPTING -> INTERRUPTED

     */

    private volatile int state;

    private static final int NEW          = 0;

    private static final int COMPLETING   = 1;

    private static final int NORMAL       = 2;

    private static final int EXCEPTIONAL  = 3;

    private static final int CANCELLED    = 4;

    private static final int INTERRUPTING = 5;

    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */

    private Callable<V> callable;

    /** 用来存储任务执行结果或者异常对象,根据任务state在get时候选择返回执行结果还是抛出异常 */

    private Object outcome; // non-volatile, protected by state reads/writes

    /** 当前运行Run方法的线程  */

    private volatile Thread runner;

    /** Treiber stack of waiting threads */

    private volatile WaitNode waiters;

    /**

     * Returns result or throws exception for completed task.

     *

     * @param s completed state value

     */

    @SuppressWarnings("unchecked")

    private V report(int s) throws ExecutionException {

        Object x = outcome;

        if (s == NORMAL)

            return (V)x;

        if (s >= CANCELLED)

            throw new CancellationException();

        throw new ExecutionException((Throwable)x);

    }

    /**

     * Creates a {@code FutureTask} that will, upon running, execute the

     * given {@code Callable}.

     *

     * @param  callable the callable task

     * @throws NullPointerException if the callable is null

     */

    public FutureTask(Callable<V> callable) {

        if (callable == null)

            throw new NullPointerException();

        this.callable = callable;

        this.state = NEW;       // ensure visibility of callable

    }

    /**

     * Creates a {@code FutureTask} that will, upon running, execute the

     * given {@code Runnable}, and arrange that {@code get} will return the

     * given result on successful completion.

     *

     * @param runnable the runnable task

     * @param result the result to return on successful completion. If

     * you don't need a particular result, consider using

     * constructions of the form:

     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}

     * @throws NullPointerException if the runnable is null

     */

    public FutureTask(Runnable runnable, V result) {

        this.callable = Executors.callable(runnable, result);

        this.state = NEW;       // ensure visibility of callable

    }

    //判断任务是否已取消(异常中断、取消等)

    public boolean isCancelled() {

        return state >= CANCELLED;

    }

   /**

    判断任务是否已结束(取消、异常、完成、NORMAL都等于结束)

    **

    public boolean isDone() {

        return state != NEW;

    }

    /**

   mayInterruptIfRunning用来决定任务的状态。

                   true : 任务状态= INTERRUPTING = 5。如果任务已经运行,则强行中断。如果任务未运行,那么则不会再运行

                   false:CANCELLED    = 4。如果任务已经运行,则允许运行完成(但不能通过get获取结果)。如果任务未运行,那么则不会再运行

    **/

    public boolean cancel(boolean mayInterruptIfRunning) {

        if (state != NEW)

            return false;

        if (mayInterruptIfRunning) {

            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))

                return false;

            Thread t = runner;

            if (t != null)

                t.interrupt();

            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state

        }

        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))

            return false;

        finishCompletion();

        return true;

    }

    /**

     * @throws CancellationException {@inheritDoc}

     */

    public V get() throws InterruptedException, ExecutionException {

        int s = state;

        //如果任务未彻底完成,那么则阻塞直至任务完成后唤醒该线程

        if (s <= COMPLETING)

            s = awaitDone(false, 0L);

        return report(s);

    }

    /**

     * @throws CancellationException {@inheritDoc}

     */

    public V get(long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutException {

        if (unit == null)

            throw new NullPointerException();

        int s = state;

        if (s <= COMPLETING &&

            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)

            throw new TimeoutException();

        return report(s);

    }

    /**

     * Protected method invoked when this task transitions to state

     * {@code isDone} (whether normally or via cancellation). The

     * default implementation does nothing.  Subclasses may override

     * this method to invoke completion callbacks or perform

     * bookkeeping. Note that you can query status inside the

     * implementation of this method to determine whether this task

     * has been cancelled.

     */

    protected void done() { }

    /**

    该方法在FutureTask里只有run方法在任务完成后调用。

    主要保存任务执行结果到成员变量outcome 中,和切换任务执行状态。

    由该方法可以得知:

    COMPLETING : 任务已执行完成(也可能是异常完成),但还未设置结果到成员变量outcome中,也意味着还不能get

    NORMAL    : 任务彻底执行完成

    **/

    protected void set(V v) {

        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

            outcome = v;

            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

            finishCompletion();

        }

    }

    /**

     * Causes this future to report an {@link ExecutionException}

     * with the given throwable as its cause, unless this future has

     * already been set or has been cancelled.

     *

     * <p>This method is invoked internally by the {@link #run} method

     * upon failure of the computation.

     *

     * @param t the cause of failure

     */

    protected void setException(Throwable t) {

        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

            outcome = t;

            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state

            finishCompletion();

        }

    }

    /**

    由于实现了Runnable接口的缘故,该方法可由执行线程所调用。

    **/

    public void run() {

        //只有当任务状态=new时才被运行继续执行

        if (state != NEW ||

            !UNSAFE.compareAndSwapObject(this, runnerOffset,

                                         null, Thread.currentThread()))

            return;

        try {

            Callable<V> c = callable;

            if (c != null && state == NEW) {

                V result;

                boolean ran;

                try {

                    //调用Callable的Call方法

                    result = c.call();

                    ran = true;

                } catch (Throwable ex) {

                    result = null;

                    ran = false;

                    setException(ex);

                }

                if (ran)

                    set(result);

            }

        } finally {

            // runner must be non-null until state is settled to

            // prevent concurrent calls to run()

            runner = null;

            // state must be re-read after nulling runner to prevent

            // leaked interrupts

            int s = state;

            if (s >= INTERRUPTING)

                handlePossibleCancellationInterrupt(s);

        }

    }

    /**

   如果该任务在执行过程中不被取消或者异常结束,那么该方法不记录任务的执行结果,且不修改任务执行状态。

   所以该方法可以重复执行N次。不过不能直接调用,因为是protected权限。

    **/

    protected boolean runAndReset() {

        if (state != NEW ||

            !UNSAFE.compareAndSwapObject(this, runnerOffset,

                                         null, Thread.currentThread()))

            return false;

        boolean ran = false;

        int s = state;

        try {

            Callable<V> c = callable;

            if (c != null && s == NEW) {

                try {

                    c.call(); // don't set result

                    ran = true;

                } catch (Throwable ex) {

                    setException(ex);

                }

            }

        } finally {

            // runner must be non-null until state is settled to

            // prevent concurrent calls to run()

            runner = null;

            // state must be re-read after nulling runner to prevent

            // leaked interrupts

            s = state;

            if (s >= INTERRUPTING)

                handlePossibleCancellationInterrupt(s);

        }

        return ran && s == NEW;

    }

    /**

     * Ensures that any interrupt from a possible cancel(true) is only

     * delivered to a task while in run or runAndReset.

     */

    private void handlePossibleCancellationInterrupt(int s) {

        // It is possible for our interrupter to stall before getting a

        // chance to interrupt us.  Let's spin-wait patiently.

        if (s == INTERRUPTING)

            while (state == INTERRUPTING)

                Thread.yield(); // wait out pending interrupt

        // assert state == INTERRUPTED;

        // We want to clear any interrupt we may have received from

        // cancel(true).  However, it is permissible to use interrupts

        // as an independent mechanism for a task to communicate with

        // its caller, and there is no way to clear only the

        // cancellation interrupt.

        //

        // Thread.interrupted();

    }

    /**

     * Simple linked list nodes to record waiting threads in a Treiber

     * stack.  See other classes such as Phaser and SynchronousQueue

     * for more detailed explanation.

     */

    static final class WaitNode {

        volatile Thread thread;

        volatile WaitNode next;

        WaitNode() { thread = Thread.currentThread(); }

    }

    /**

    该方法在任务完成(包括异常完成、取消)后调用。删除所有正在get获取等待的节点且唤醒节点的线程。和调用done方法和置空callable.

    **/

    private void finishCompletion() {

        // assert state > COMPLETING;

        for (WaitNode q; (q = waiters) != null;) {

            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

                for (;;) {

                    Thread t = q.thread;

                    if (t != null) {

                        q.thread = null;

                        LockSupport.unpark(t);

                    }

                    WaitNode next = q.next;

                    if (next == null)

                        break;

                    q.next = null; // unlink to help gc

                    q = next;

                }

                break;

            }

        }

        done();

        callable = null;        // to reduce footprint

    }

    /**

    阻塞等待任务执行完成(中断、正常完成、超时)

    **/

    private int awaitDone(boolean timed, long nanos)

        throws InterruptedException {

        final long deadline = timed ? System.nanoTime() + nanos : 0L;

        WaitNode q = null;

        boolean queued = false;

        for (;;) {

            /**

            这里的if else的顺序也是有讲究的。

            1.先判断线程是否中断,中断则从队列中移除(也可能该线程不存在于队列中)

            2.判断当前任务是否执行完成,执行完成则不再阻塞,直接返回。

            3.如果任务状态=COMPLETING,证明该任务处于已执行完成,正在切换任务执行状态,CPU让出片刻即可

            4.q==null,则证明还未创建节点,则创建节点

            5.q节点入队

            6和7.阻塞

            **/

            if (Thread.interrupted()) {

                removeWaiter(q);

                throw new InterruptedException();

            }

            int s = state;

            if (s > COMPLETING) {

                if (q != null)

                    q.thread = null;

                return s;

            }

            else if (s == COMPLETING) // cannot time out yet

                Thread.yield();

            else if (q == null)

                q = new WaitNode();

            else if (!queued)

                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,

                                                     q.next = waiters, q);

            else if (timed) {

                nanos = deadline - System.nanoTime();

                if (nanos <= 0L) {

                    removeWaiter(q);

                    return state;

                }

                LockSupport.parkNanos(this, nanos);

            }

            else

                LockSupport.park(this);

        }

    }

    /**

     * Tries to unlink a timed-out or interrupted wait node to avoid

     * accumulating garbage.  Internal nodes are simply unspliced

     * without CAS since it is harmless if they are traversed anyway

     * by releasers.  To avoid effects of unsplicing from already

     * removed nodes, the list is retraversed in case of an apparent

     * race.  This is slow when there are a lot of nodes, but we don't

     * expect lists to be long enough to outweigh higher-overhead

     * schemes.

     */

    private void removeWaiter(WaitNode node) {

        if (node != null) {

            node.thread = null;

            retry:

            for (;;) {          // restart on removeWaiter race

                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {

                    s = q.next;

                    if (q.thread != null)

                        pred = q;

                    else if (pred != null) {

                        pred.next = s;

                        if (pred.thread == null) // check for race

                            continue retry;

                    }

                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,

                                                          q, s))

                        continue retry;

                }

                break;

            }

        }

    }

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE;

    private static final long stateOffset;

    private static final long runnerOffset;

    private static final long waitersOffset;

    static {

        try {

            UNSAFE = sun.misc.Unsafe.getUnsafe();

            Class<?> k = FutureTask.class;

            stateOffset = UNSAFE.objectFieldOffset

                (k.getDeclaredField("state"));

            runnerOffset = UNSAFE.objectFieldOffset

                (k.getDeclaredField("runner"));

            waitersOffset = UNSAFE.objectFieldOffset

                (k.getDeclaredField("waiters"));

        } catch (Exception e) {

            throw new Error(e);

        }

    }

}

FutureTask简单应用:

public class FutureMain {

    public static void main(String[] args)

            throws ExecutionException, InterruptedException {

        //构造FutureTask

        FutureTask<String> futureTask = new FutureTask<String>(new CallableClass("xxx"));

        ExecutorService executorService = Executors.newFixedThreadPool(1);

        //执行FutureTask,发送请求

        //在这里开启线程进行RealData的call()执行

        executorService.submit(futureTask);

        System.out.println("请求完毕。。。");

        try {

            //这里可以进行其他额外的操作,这里用sleep代替其他业务的处理

            Thread.sleep(200);

        }catch (InterruptedException e) {

            e.printStackTrace();

        }

        //获取call()方法的返回值

        //如果此时call()方法没有执行完成,则依然会等待

        System.out.println("真实数据:"+futureTask.get());

    }

}   

上一篇下一篇

猜你喜欢

热点阅读