Callable实现子线程获取函数返回值

2019-05-28  本文已影响0人  Cris_Ma

Callable接口

Java中的子线程通常是通过Thread或者Runnable的方式实现的,但是这种方式只能通过回调,或者共享变量等方式来传递数据,而Callable则是可以获取返回结果的一种子线程实现方式。

Callable是一个接口,源码如下:

public interface Callable<V> {
    V call() throws Exception;
}

非常简单,只有一个方法,和一个泛型V,所以我们创建Callable对象的时候,也只需要指定返回类型并实现call方法就可以了。

Future接口

看完了Callable接口,会发现它非常简单,没有办法在子线程中直接通过它来获取到返回结果的,这时候就需要Future发挥作用了。源码如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

可以看到,Future实际上可以理解为Callable的管理类。

在线程池中执行任务时,除了execute方法之外,还有一个submit方法:

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

它返回的就是一个Future对象,可以通过它来回去Callable任务的执行结果:


Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Sub Thread is calculating...");
                Thread.sleep(10000);
                return 10;
            }
        };
        
Future<Integer> future = Executors.newCachedThreadPool().submit(callable);
    try {
            System.out.println("Main Thread start waiting result... ");

            int res = future.get();
            System.out.println("Main Thread get result: " + res);
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

//运行结果:
Main Thread start waiting result... 
Sub Thread is calculating...
Main Thread get result: 10

FutureTask

如果不用Java提供的线程池,直接用Thread怎样在子线程中运行Callable呢? 这时候就要用到FutureTask类了。

FutureTask实现了FutureRunnable接口,这就意味着,它既可以放在Thread中去运行,又能够对任务进行管理,下面是源码:


    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

可以看到,构造函数中传入了待运行的任务Callable对象或者Runnable对象和指定的返回结果,V result用来指定运行完成后的返回值,如果不想用指定值,可以用Future<?> f = new FutureTask<Void>(runnable, null)来返回null。采用Callable构造方法创建的FutureTask对象,执行完毕返回的是实际运算结果,而Runnable 构造函数返回值是传入的result。

task的状态

FutureTask中持有的任务对象,有以下几种状态:

    private static final int NEW          = 0; //新建或运行中
    private static final int COMPLETING   = 1;//任务运行结束,正在处理一些后续操作
    private static final int NORMAL       = 2;//任务已经完成,COMPLETING的下一个状态
    private static final int EXCEPTIONAL  = 3;//任务抛出异常,COMPLETING的下一个状态
    private static final int CANCELLED    = 4;//任务被取消
    private static final int INTERRUPTING = 5;//收到打断指令,还没有执行interrupt
    private static final int INTERRUPTED  = 6;//收到打断指令,也执行了interrupt

可能的状态变化主要有以下几种:

     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED

task的执行过程

FutureTask实现了Runnable接口,是可以直接放在Thread中执行的,实际上运行的就是它的run方法:

public void run() {
    //r如果当前状态不是NEW,说明任务已经执行完成了,直接返回
    //如果当前状态是NEW,尝试用CAS方式将当前线程赋值给RUNNER,赋值前RUNNER的值应该是null,否则赋值失败
    //赋值失败表示已经有线程执行了run方法,直接返回
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //运行中抛出异常
                    setException(ex);
                }
                //ran为true,说明正常运行结束,得到了返回结果
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

执行结果其实是比较简单的,通过RUNNER来记录执行任务的线程,从而保证只有一个线程可以执行该任务。运行结束后有两个出口:

    protected void setException(Throwable t) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = t;
            U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

    protected void set(V v) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = v;
            U.putOrderedInt(this, STATE, NORMAL); // final state
            finishCompletion();
        }
    }

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        //唤醒等待线程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

这两个方法实际上是一样的,都是将状态赋值为COMPLETING,然后保存结果(运行结果或错误信息),再执行finishCompletion方法,通知WAITERS里记录的等待线程继续执行,并清空WAITERS

获取返回结果

获取返回结果是通过get()方法:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
            //cas机制,将新建节点q的next指向原来的节点workers,然后将workers更新为新建的节点。workers(WAITERS)实际上就是持有了所有等待线程的一个链表
                queued = U.compareAndSwapObject(this, WAITERS,
                                                q.next = waiters, q);
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

get()方法比较简单,先判断当前状态,如果状态 < COMPLETING,说明任务没有执行完毕,直接调用awaitDone方法。

awaitDone方法可以接受两个参数,用来指定是否设置超时时间。它内部是一个无限for循环。下面是awaitDone方法的执行步骤(忽略超时设置):

  1. 进入awaitDone方法时,state一定是小于COMPLETING的,第一次会走else if (q == null)分支,创建一个WaitNode()对象用来保存当前线程

  2. 第二次循环q已经不是null了,如果任务仍然没有结束,会执行else if (!queued)分支,queued表示创建的WaitNode()是否已经添加到链表里,如果没有尝试添加,直到添加成功为止。

  3. 等待线程添加成功以后进入下一个循环,此时如果任务仍然没有结束,会走到else分支,挂起当前线程(阻塞)

  4. 此处阻塞的是等待结果的线程,也就是调用FutureTaskget()方法的线程,而不是执行任务的线程。阻塞线程用的是LockSupport.park(this)方法,唤醒的方法是LockSupport.unpark(),该方法在上边的finishCompletion()中出现了,也就是说,任务执行结束(运行完,抛出异常,被取消)时,等待的线程才会被唤醒,继续下一次循环。

  5. 任务结束以后,如果state是COMPLETING状态,说明一些清理任务还没有执行完,等待的线程会让出cpu,让其他线程优先执行

  6. 直到state 大于COMPLETING,说明FutureTask已经完全结束了,此时会会执行(s > COMPLETING)分支,把节点置空,并返回。

awaitDone返回以后,说明任务已经执行完成了,会进入report方法:

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

可以看到,如果是正常结束,或者抛出异常结束,会返回结果,而如果是被取消,则会抛出异常。

总结

  1. FutureTask可以视为一个管理Callable任务的工具类,执行Callable任务的是FutureTaskrun方法,所以,可以通过 new Thread(futuretask)的方法来实现子线程执行任务

  2. 获取执行结果是通过FutureTaskget方法,调用该方法后,如果线程会被挂起,知道任务结束为止

  3. 获取结果的线程数量没有限定,可以是任意个线程

  4. 获取结果的线程被挂起以后,可以通过取消,超时等方法在任务执行完毕以前结束挂起状态。

上一篇下一篇

猜你喜欢

热点阅读