Java 并发

【Java 并发笔记】线程池相关整理(上)

2019-01-29  本文已影响63人  58bc06151329

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

1. 简介

2. 线程池实现类

Executor 类图

Runnable 接口

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

Callable 接口

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Executor 接口

方法 说明
void execute(Runnable command) 在未来某个时间执行给定的命令。
public interface Executor {
    void execute(Runnable command);
}

ExecutorService 接口

public interface ExecutorService extends Executor {
// 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
boolean awaitTermination(long timeout, TimeUnit unit);
// 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
// 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
// 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
// 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
// 如果此执行程序已关闭,则返回 true。
boolean isShutdown();
// 如果关闭后所有任务都已完成,则返回 true。
boolean isTerminated();
// 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
void shutdown();
// 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
List<Runnable> shutdownNow();
// 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
<T> Future<T> submit(Callable<T> task);
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
Future<?> submit(Runnable task);
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
<T> Future<T> submit(Runnable task, T result);
}

扩展的跟踪异步线程

方法 说明
Future<?> submit(Runnable task) 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回 null。
<T> Future<T> submit(Runnable task,T result) 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。
<T> Future<T> submit(Callable<T> task) 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。如果想立即阻塞任务的等待,则可以使用 result = exec.submit(aCallable).get() 形式的构造。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。注意,可以正常地或通过抛出异常来终止已完成任务。如果正在进行此操作时修改了给定的 collection,则此方法的结果是不确定的。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit) throws InterruptedException 超时等待,同上。
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,ExecutionException 与 invokeAll的区别是,任务列表里只要有一个任务完成了,就立即返回。而且一旦正常或异常返回后,则取消尚未完成的任务。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit) throws InterruptedException 超时等待,同上。
boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException 一直等待,直到所有任务完成。请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行,或者超时时间的到来如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false。

管理生命周期

方法 说明
void shutdown() 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。
List<Runnable> shutdownNow() 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。 无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,在 ThreadPoolExecutor 中,是通过 Thread.interrupt() 来做取消实现,所以如果任务无法响应中断,则永远无法终止。
boolean isShutdown() 如果此执行程序已关闭,则返回 true。
boolean isTerminated() 如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown()shutdownNow(),否则 isTerminated() 永不为 true。

Future 接口

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

2.1 AbstractExecutorService

submit 方法

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
/**
 * Returns a {@code RunnableFuture} for the given runnable and default
 * value.
 *
 * @param runnable the runnable task being wrapped
 * @param value the default value for the returned future
 * @param <T> the type of the given value
 * @return a {@code RunnableFuture} which, when run, will run the
 * underlying runnable and which, as a {@code Future}, will yield
 * the given value as its result and provide for cancellation of
 * the underlying task
 * @since 1.6
 */
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

/**
 * Returns a {@code RunnableFuture} for the given callable task.
 *
 * @param callable the callable task being wrapped
 * @param <T> the type of the callable's result
 * @return a {@code RunnableFuture} which, when run, will call the
 * underlying callable and which, as a {@code Future}, will yield
 * the callable's result as its result and provide for
 * cancellation of the underlying task
 * @since 1.6
 */
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
/**
 * A callable that runs given task and returns given result
 */
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

invokeAll 方法

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false; // 所有任务是否完成的标志
    try {
        // 对所有任务进行包装,并提交任务,并将返回的结果添加到futures集合中
        for (Callable<T> t : tasks) { 
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        // 对所有结果进行判断或者阻塞等待结果返回
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) { // 如果任务没有完成
                try {
                    f.get(); // 则阻塞等待结果返回,并压制异常
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        // 当所有任务已经完成了(不管是正常完成还是异常完成,
        // 如发生CancellationException、ExecutionException ),
        // 则将完成标志设为true,并返回结果集合
        done = true; 
        return futures;
    } finally {
        if (!done) // 如果发生中断异常InterruptedException 则取消已经提交的任务
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false; 
    try {
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        for (int i = 0; i < size; i++) {
            execute((Runnable)futures.get(i));
            // 在添加执行任务时超时判断,如果超时则立刻返回futures集合
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L)
                return futures;
        }
        // 每次对结果进行判断时都进行超时判断
        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) { // 判断超时
                if (nanos <= 0L)
                    return futures;
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;
                }
                nanos = deadline - System.nanoTime(); // 更新剩余时间
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

invokeAny 方法

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (tasks == null)
        throw new NullPointerException();
    int ntasks = tasks.size();
    if (ntasks == 0)
        throw new IllegalArgumentException();
    // 含有结果的Future队列
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    // 将本对象作为Executor创建ExecutorCompletionService对象
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);

    try {
        // 记录可能抛出的执行异常
        ExecutionException ee = null;
        // 初始化超时时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();

        // 确定在主循环之前开始一个任务
        futures.add(ecs.submit(it.next()));
        --ntasks;
        int active = 1; // 记录正在执行的任务数量

        for (;;) {
            // 获取并移除下一个将要完成的任务的结果表示,如果没有任何表示则返回null
            Future<T> f = ecs.poll();// 底层调用队列的poll方法(非阻塞)
            if (f == null) { // 没有结果表示
                if (ntasks > 0) { //如果还有剩余的任务,则提交下一个任务
                    --ntasks;
                    futures.add(ecs.submit(it.next())); 
                    ++active;
                }
                // 出现这种情况说明,已经有任务完成,并返回结果表示,但是
                // 捕获到了异常,则跳出主循环,进行异常的抛出
                else if (active == 0) 
                    break;
                else if (timed) { // 超时获取结果表示
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                else // 阻塞获取结果表示
                    f = ecs.take();
            }
            if (f != null) { //含有结果表示
                --active;
                try {
                    return f.get(); // 返回结果
                } catch (ExecutionException eex) {
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                }
            }
        }
        // 
        if (ee == null)
            ee = new ExecutionException();
        throw ee;

    } finally { // 最后取消所有提交的任务
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);
    }
}
// 对doInvokeAny的封装,实现无超时等待的版本
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}
 // 对doInvokeAny的封装,实现超时等待的版本
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
// ExecutorCompletionService 的submit方法
public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}
// 阻塞队列:用来存储已经完成的任务
private final BlockingQueue<Future<V>> completionQueue;

/**
 * 拓展了 FutureTask 在完成时将任务入队功能
 */
private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    // 此方法在FutureTask任务run方法完成时调用,这里是将完成的任务入队
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

public Future<V> poll() {
    return completionQueue.poll();
}

public Future<V> poll(long timeout, TimeUnit unit)
        throws InterruptedException {
    return completionQueue.poll(timeout, unit);
}

2.2 FutureTask

核心成员变量

变量名称 说明
Callable callable 被提交的任务。
Object outcome 任务执行结果或者任务异常。
volatile Thread runner 执行任务的线程。
volatile WaitNode waiters 等待结点,关联等待线程。
long stateOffset state 字段的内存偏移量。
long runnerOffset runner 字段的内存偏移量。
long waitersOffset waiters 字段的内存偏移量。
private static final int NEW          = 0; //任务新建和执行中
private static final int COMPLETING   = 1; //任务将要执行完毕
private static final int NORMAL       = 2; //任务正常执行结束
private static final int EXCEPTIONAL  = 3; //任务异常
private static final int CANCELLED    = 4; //任务取消
private static final int INTERRUPTING = 5; //任务线程即将被中断
private static final int INTERRUPTED  = 6; //任务线程已中断

run() 方法

public void run() {        // 校验任务状态
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;       // double check
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {            //执行业务代码
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {        // 重置runner
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
protected void setException(Throwable t) {
    // state状态 NEW->COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        // COMPLETING -> EXCEPTIONAL 到达稳定状态
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
        // 一些结束工作
        finishCompletion();
    }
}
......
protected void set(V v) {
    // state状态 NEW->COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        // COMPLETING -> NORMAL 到达稳定状态
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
        // 一些结束工作
        finishCompletion();
    }
}

get 和 get(long, TimeUnit) 方法

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

awaitDone 方法

/**
 1. 等待任务执行完毕,如果任务取消或者超时则停止
 2. @param timed 为true表示设置超时时间
 3. @param nanos 超时时间
 4. @return 任务完成时的状态
 5. @throws InterruptedException
 */
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    // 任务截止时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 自旋
    for (;;) {
        if (Thread.interrupted()) {
            //线程中断则移除等待线程,并抛出异常
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        if (s > COMPLETING) {
            // 任务可能已经完成或者被取消了
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING)
            // 可能任务线程被阻塞了,主线程让出CPU
            Thread.yield();
        else if (q == null)
            // 等待线程结点为空,则初始化新结点并关联当前线程
            q = new WaitNode();
        else if (!queued)
            // 等待线程入队列,成功则queued=true
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                    q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //已经超时的话,移除等待结点
                removeWaiter(q);
                return state;
            }
            // 未超时,将当前线程挂起指定时间
            LockSupport.parkNanos(this, nanos);
        }
        else
            // timed=false时会走到这里,挂起当前线程
            LockSupport.park(this);
    }
}

finishCompletion 方法

/**
 * 移除并唤醒所有等待线程,执行done,置空callable
 * nulls out callable.
 */
private void finishCompletion() {
    //遍历等待结点
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    //唤醒等待线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                // unlink to help gc
                q.next = null;
                q = next;
            }
            break;
        }
    }
    //模板方法,可以被覆盖
    done();
    //清空callable
    callable = null;
}

cancel 方法

public boolean cancel(boolean mayInterruptIfRunning) {
    if (state != NEW)
        return false;
    if (mayInterruptIfRunning) {
        if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
            return false;
        Thread t = runner;
        if (t != null)
            t.interrupt();
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
        return false;
    finishCompletion();
    return true;
}

接下一篇 【Java 并发笔记】线程池相关整理(下)

上一篇下一篇

猜你喜欢

热点阅读