Android开发Android开发Java知识

Callable创建Thread的源码解析

2020-08-30  本文已影响0人  巴黎没有摩天轮Li

前言

Java线程的Thread创建共有三种方式,两种类别。第一种类别是没有返回参数的,另一种是具备获取线程返回结果的。第一种类别有两种创建方式,继承Thread类、实现Runnable接口。第二种类别是使用Callable接口通过中间人FutureTask类处理使用。这里我只说第二种的方式创建线程。

使用方式

 int i = Runtime.getRuntime().availableProcessors();
 // 创建线程池
 ThreadPoolExecutor executor = new ThreadPoolExecutor(i * 2 + 1, Integer.MAX_VALUE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
 FutureTask<String> stringFutureTask = new FutureTask<>(() -> {
     for (int j = 0; j < 5; j++) {
         System.out.println(j);
         Thread.sleep(1000);
     }
     return "执行完成";
 });
  // 1
 executor.submit(stringFutureTask);
 try {
      // 阻塞主线程 等待子线程处理完毕后释放
     System.out.println(stringFutureTask.get());
 } catch (ExecutionException e) {
     e.printStackTrace();
 }
System.out.println("主线程");

打印结果

0
1
2
3
4
执行完成
主线程

源码分析

先简单看下代码1处线程池的submit()方法,看是实现ExecutorService这个接口

public interface ExecutorService extends Executor {
    // 具备返回值的 Callable入参方法
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    // 非具备返回值的Callable方法
    Future<?> submit(Runnable task);
}

由此可见,入参Callable接口的方法是带有泛型返回值的方法,而入参为Runnable接口的没有返回值,第二个方法的第二个入参result没什么太大用处,暂且不谈。

Callable

public interface Callable<V> {
    V call() throws Exception;
}

嗯.. 带返回值的方法。
Ok, 我们来看下FutureTask到底是什么鬼。

FutureTask

public class FutureTask<V> implements RunnableFuture<V> {}

实现了RunnableFuture,看看RunnableFuture接口是什么。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

喔?接口多继承了Runnable与Future接口,原来FutureTask类本身就实现了Runnable接口了哇。我好想明白了什么...线程池在调用execute的时候,一定调用了FutureTask中的run()方法。
再来看看Future接口都有什么秘密。

public interface Future<V> {
    // 取消FutureTask任务 当任务已经完成了就无法取消了
    boolean cancel(boolean mayInterruptIfRunning);
    // 是否任务已经取消
    boolean isCancelled();
    // 是否任务执行完成
    boolean isDone();
    // 获取任务返回参数 无限阻塞所在线程
    V get() throws InterruptedException, ExecutionException;
    // 带有阻塞超时时间的获取任务返回参数 建议使用这个方法
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

噢噢,原来是FutureTask的一些调度方法啊。FutureTask骨架是明朗了,FutureTask实现了Runnable接口,并也实现了调度Future接口。我们回到FutureTask类。

public class FutureTask<V> implements RunnableFuture<V> {
    private volatile int state;
    private static final int NEW          = 0;// 线程任务的创建
    private static final int COMPLETING   = 1;// 任务执行中 
    private static final int NORMAL       = 2;// 任务执行完毕
    private static final int EXCEPTIONAL  = 3;// 任务执行异常
    private static final int CANCELLED    = 4;// 任务取消成功
    private static final int INTERRUPTING = 5;// 任务被打断中
    private static final int INTERRUPTED  = 6;// 任务被打断成功

    private Callable<V> callable; // 构造传入的Callable接口
  
    private Object outcome;// 异步线程返回结果
   
    private volatile Thread runner;// 当前任务所运行的线程
    
    private volatile WaitNode waiters;// 记录当调用get方法时,阻塞的线程

    // 记录任务所在的线程
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

    // 构造1 没有什么可说的 直接将Callable接口赋值
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW; 
    }

    // 构造2 
    public FutureTask(Runnable runnable, V result) {
        // 1
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;  
    }

      // 执行任务 run 方法
      public void run() {
        // 状态是创建过程的话,通过CAS 将当前线程赋值给成员变量runner 若失败就直接return
        if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            // 临时变量接一下
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                // 返回值
                V result;
                boolean ran;
                try {
                    // 调用call方法执行
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // 捕捉异常
                    result = null;
                    ran = false;
                    setException(ex);
                }
                // 接收成功后将result值赋值给成员变量outcome CAS方式
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

我们看下构造方法2 中的代码1处,发掘这里使用了适配器模式,若使用的是构造方法二的初始化方式,则将Runnable转化成了Callable。

public static Callable<Object> callable(Runnable task) {
    if (task == null)
        throw new NullPointerException();
    // result 默认为null
    return new RunnableAdapter<Object>(task, null);
}

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    // result 是我们初始化传入的,其实并无太大作用
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

ok, 看到这里我们知晓了,原来FutureTask为了能够实现Callable方法具备返回值,整合了Runnable与Callable,这样的话本质上还是执行Runnable方法。

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    // 这里若当前任务还在执行中,并且过了超时时间,直接抛出Timeout异常。
    if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

核心awaitDone()方法

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        // 等待线程队列
        WaitNode q = null;
        boolean queued = false;
        // 开辟个死循环
        for (;;) {
            // 若当前线程已经被打断了
            if (Thread.interrupted()) {
                // 从等待线程队列中移除 并抛出异常
                removeWaiter(q);
                throw new InterruptedException();
            }
            // 获取当前状态
            int s = state;
            // 若执行完成
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                // 返回当前状态
                return s;
            }
            // 若当前任务正在执行 
            else if (s == COMPLETING) // cannot time out yet
                // 1 既然正在执行,就暂时不一直for循环占有CPU资源,将其让出 并同时和其他线程竞争
                Thread.yield();
            else if (q == null)
                // 这里只会执行一次
                q = new WaitNode();
            else if (!queued)
                // 如果是排队 就将等待队列next 指向当前的等待线程
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            else if (timed) {
                // 计算等待超时时间 截止时间减去当前时间
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    // 超时了
                    removeWaiter(q);
                    return state;
                }
                // 没有过超时时间就让线程等待
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

cancel()方法

public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    // 直接将当前线程打断
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

最终report()

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

将成员变量outcome返回,这个outcome赋值是在run()方法调用完c.call()方法后进行的。

总结

FutureTask 两种构造器,最终都转化成了 Callable,所以在 run 方法执行的时候,只需要执行 Callable 的 call 方法即可,在执行 c.call() 代码时,如果入参是 Runnable 的话, 调用路径为 c.call() -> RunnableAdapter.call() -> Runnable.run(),如果入参是 Callable 的话,直接调用。

通过 FutureTask 把 Runnnable、Callable、Future 都串起来了,使 FutureTask 具有三者的功能,统一了 Runnnable 和 Callable,更方便使用。

上一篇 下一篇

猜你喜欢

热点阅读