Java并发编程

Java并发编程 - FutureTask

2019-03-19  本文已影响15人  HRocky

Callable

我们知道线程运行任务其中的一个方法就是创建一个实现Runnable接口的类,然后通过Thread的构造方法设置进去,线程启动后就可以执行Runnalbe的逻辑。

可以回顾一样Runnable接口的定义:

java.lang.Runnable

public interface Runnable { 
    public abstract void run();
}

可以看到Runnable接口只定义了一个方法,而且这个方法没有返回值。

现在如果我们执行一个任务需要它返回给我们运行结果,该怎么做?

Java为我们提供了另外一个接口Callable,貌似也有这样的功能,相比于Runnable,它允许有返回值,并且可以抛出检查型异常信息。

java.util.concurrent.Callable

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

可以看到它还是一个泛型接口,方法的返回值可以泛型化。

于是,我们类似这样定义我们的任务,提供返回值。

public class MyTask implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        return 1 * 1;
    }
    
}

当你这样定义后,你会发现了一个问题,任务是需要线程来运行了,但是看了一下Thread类,发现没有类似构造方法或者是普通方法来将Callable类型的对象设置进去,Thread只能运行Runnable类型的任务对象。我想JDK API的制作者将这个接口的名字定义为"Callable"是有特殊含义: 实现它接口的类只是一个可被调用的对象,而不是一个可运行的对象。

线程运行提供返回值,现在这个需求我们卡住了,先暂时放在一边,我们继续。

Runnable和Callable都可以理解为任务具体业务逻辑封装接口。Java是面向对象的,一切都是对象,一段具体的业务逻辑代码当然也就需要有某种东西封装起来,实现了Runnable或Callable接口的类创建的对象就是业务逻辑代码的载体。

Future

有时候线程执行任务是耗时的,这种情况下我们希望不要浪费时间一直等待它返回结果,而是希望利用等待的时间处理其他任务,只要知道有一个线程在跑,并且它在某个时刻会返回结果就行了。也就是说希望我们的执行是异步的。

异步:当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

Java提供了Future接口来实现对异步调用的支持。下面是接口的定义:

java.util.concurrent.Future

boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

A Future represents the result of an asynchronous computation.

API对这个接口的描述是:Future代表异步计算的结果。这个接口提供了get方法,用于获取异步执行后的结果。

现在我们通过实现这个接口来定义我们的任务类,跟上面类似你会发现,先不说异步获取结果,首先这个定义出来的任务类创建的对象必须可以被线程调用,但是仅仅执行这个接口的任务类不满足。

也就是说我们需要一个既能被线程运行又能异步获取结果的类存在。

Java定义了RunnableFuture接口来支持这种需求。RunnableFuture接口的定义如下:

java.util.concurrent.RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

RunnableFuture接口继承了Runnable接口和Future接口,这样实现了这个接口的任务类能被线程调用,然后也能异步获取结果。

不过这里要注意:Future只是表示可异步获取结果的接口类,它只是一个规定,实现它接口的类要实现它的这种语义。当然我们可以自己实现如何异步调用,又如何返回异步调用的结果。不过JDK为我们提供了FutureTask接口,我们直接使用它就可以了。

FutureTask

public class FutureTask<V> implements RunnableFuture<V>

上面是FutureTask的定义,可以看到实现了RunnableFuture接口,这样通过它创建的对象就是可被线程运行的和可异步获取结果的。

可以简单地从字面上理解这个类:未来任务类。也就是说创建这个任务之后直接交给线程执行,什么时候运行不用关心了,相信你运行后会带回来某些东西。

##FutureTask类概览

FutureTask.png

上面不是说过可返回结果的Callable对象无法被线程运行嘛,从上面可以看出FutureTask可接收Callable类型的对象,并且FutureTask还是可运行的,那么在FutureTask的run方法运行Callable的call方法,就到达了线程运行Callable代码逻辑的效果。

##可能的状态转移

在我们之前使用Runnable的时候,实现了Runnable接口的类创建的对象就是一个任务,创建这个任务后交给线程,之后这个任务对我们来说就是不可控了,但是我们的"FutureTask"必须是可控的,因为业务逻辑执行完成后,需要得到计算的结果。

正如上面FutureTask类概览所示,state用来表示FutureTask的内部状态,所拥有的状态如上图所示,可能的状态转移为:

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

##FutureTask执行原理

1. 结果的载体

上面我们已经说了Callable对象封装了业务逻辑代码,结果是业务逻辑代码执行后产生的,虽然说Callable对象提供的call方法可以返回执行后的结果,但是我们无法直接从这对象中获取,因为这个对象不能被线程直接调用,能被线程直接调用的是实现了Runnable接口的类的对象,执行的代码在run方法体中,也就是Callable封装的业务逻辑代码必须以某种方式被放到实现了Runnable接口的类的对象的run方法中执行。而FutureTask是一个实现了Runnable接口的类,它可以用来执行Callable封装的逻辑。可以看到FutureTask内部有:

private Callable<V> callable;

这个属性,它用来接收封装了业务逻辑代码的Callable对象。也就是线程通过调用FutureTask就间接地调用了Callable对象封装的逻辑代码,这个调用发生在run方法中。

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

Runnable接口的run方法是被线程调用的,而且上面我们说过这个方法是没有返回值的,就算我们持有了FutureTask对象的引用,并且Callable对象的逻辑已经在run方法内执行,我们也没法获取到Callable对象执行返回的值,不过既然我们有FutureTask对象的持有权,而且Callable对象方法的执行在FutureTask对象的run方法内部,那么我们就可以在FutureTask内部定义一个属性来接收Callable对象方法的返回值。FutureTask的outcome属性正是启到这个作用:

 private Object outcome; // non-volatile, protected by state reads/writes

这个值的设置就是在run方法的内部:

 if (ran)
     set(result);

2. 结果的获取

FutureTask实现类Future接口,Future定义的get方法提供了获取异步任务的执行结果的作用,这个执行结果就是我们上面说的outcome的值。

Callable对象封装的业务逻辑代码执行操作可能是非常耗时的,也就是说cal
l方法一时半会执行不完,执行不完就无法将执行的结果设置到outcome中,如果FutureTask的方法只是简单的像下面这样返回outcome:

public V get() throws InterruptedException, ExecutionException {
    return (V) outcome;
}

那么返回的这个outcome就毫无意义,因为业务逻辑代码还没执行完,根本就还没设置结果。

如何解决呢?解决的办法就是如果业务逻辑代码没有执行完,那么当线程调用get方法获取结果的时候就让他挂起,让它知道处理还没执行完,你这时候要获取这个值的话得到的也是无用的数据,所以你要等等。

现在就有两个问题了:如何判断业务逻辑是否执行完?如何挂起?什么唤醒?

@@如何判断业务逻辑是否执行完

逻辑逻辑代码执行完就是FutureTask的run方法正确无误地调用了Callable对象的call方法,然后成功的设置了outcome值,也就是说如果到了这个时间点那么就可以说任务执行完了。也就是说到这个点之后设置一个状态变量来表示任务完成,FutureTask的state就是这样的一个状态变量,判断这个状态具有某值之后那么就表明任务已经完成了。

所以当线程调用get方法的时候首先就应该判断这个值是否就是表明任务完成的特定值,如果不是那么当前线程就要别挂起。

@@如何挂起

上面我们已经说过了当调用get方法,线程看到状态变量的值不是表示任务完成的值时,它要暂停执行,想想我们学过的知识使一个线程挂起有两种(不考虑sleep)方式:Object的wait方法和LockSupport的park方法。Object的wait方法需要同步机制的支持,而我们这里没有共享资源访问同步的问题(state是共享资源,但是会采用无锁的机制),所以不适合。FutureTask内部采用的是LockSupport的park方法来挂起线程。

LockSupport.park(this);

这里还有一个问题就是会有多个线程请求get方法,那么它们都需要挂起,而后面要重新唤醒它们,那么就需要有个地方暂存它们,FutureTask使用
Treiber Stack栈这种数据结构来暂存挂起的线程。

Treiber Stack是一个可扩展的无锁栈,利用细粒度的并发原语CAS来实现的。

FutureTask内部定义了代表当前线程的WaitNode节点:

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

通过CAS操作来实现节点的入栈:

UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q)

@@什么时候唤醒

当业务逻辑代码执行完毕,并且成功设置了outcome值后就要唤醒之前栈中被挂起的线程。

FutureTask代码中,当run方法内部Callable调用call执行业务逻辑操作无误后,就会执行设置outcome的操作,outcome操作设置成功后,会做唤醒操作。

run

try {
    result = c.call();
    ran = true;
} catch (Throwable ex) {
    result = null;
    ran = false;
    setException(ex);
}
if (ran)
    set(result);

set

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

finishCompletion

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

3. 关于取消

Future接口定义了取消任务的方法,可提供取消任务的功能:

boolean cancel(boolean mayInterruptIfRunning)

这里,我们首先要弄明白一点,取消任务是取消什么?

FutureTask说到底还是像Runnable一样被线程调用,线程调用就是执行run方法,所以取消任务就是中断执行run方法的那个线程,哪个线程在执行FutureTask的run方法,FutureTask是有记录的:

/** The thread running the callable; CASed during run() */
private volatile Thread runner;

这个属性就是用来记录执行FutureTask的线程对象的。这个属性在run方法中设置:

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
}

cancel逻辑就是中断这个线程:

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

总结

FutureTask说白了就是一个实现了Runnable接口的类,线程可以调用它的run方法。但是它跟普通实现Runnable接口类不一样,它又实现了Future接口,内部通过包装Callable这个能执行业务逻辑代码又能返回结果的对象使得它具有了返回处理结果的能力。同时通过其内部定义的任务处理状态的属性的支持,使得它能感知任务处理状态,通过这个状态来挂起和唤醒想获取它处理结果的线程。

FutureTask是一种主动感知状态的异步调用模式。

其他的异步模式有:通知和回调。FutureTask是主动去询问状态。

上一篇 下一篇

猜你喜欢

热点阅读