Java Executors.newFixedThreadPoo
Exectuor
接口只有一个方法
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
ExecutorService
继承了接口Exectuor
, 定义了一些用于管理性的方法. 比如说shutdown
系列方法, 同时返回Future
类型的对象来追踪任务的进展情况.
ExecutorService
, 因为太长了,去掉了注释, 仅留下了方法签名, 后面有一些方法的实例.
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Executors.newFixedThreadPool
该工厂方法返回的实际类型:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
submit(Runnable task)
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);
因为Runnable
中的run
方法是没有返回值的, 但是为了统一形式(返回Future
对象), 所以该submit
方法依然会返回一个Future
对象. 可以通过该Future
对象的get
方法来获取任务的运行情况, 如果返回的是null
, 说明任务成功运行结束, 其他情况调用get
方法之后会抛出对应的异常, 后面有例子会说明.
简单例子:
成功的情况, future.get()
直接返回了null
.
public static void testSubmitRunnable() throws ExecutionException, InterruptedException {
Runnable task = () -> {
try {
System.out.println("Task is running!");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Future<?> future = executorService.submit(task);
// null 表示成功
System.out.println("future.get()");
// get() blocks until the runnable finishes
System.out.println(future.get());
}
输出:
future.get()
Task is running!
null
submit(Runnable task, T result)
相比上面的submit
, 多了一个T result
参数. 即当任务成功完成之后, future.get()
会返回该result
(而不是前面的null
).
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return the given result upon successful completion.
*
* @param task the task to submit
* @param result the result to return
* @param <T> the type of the result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Runnable task, T result);
例子略, 因为和上面的submit
基本一致.
Future.get()
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
因为两个get
方法都是阻塞的, 不同的是后者提供了一个超时操作, 并在超时以后抛出TimeoutException
异常, 用于通知用户超时.
其他异常的抛出情况:
- CancellationException
如果该任务已经被cancel
了, 通过Future.cancel
来进行取消, 那么再调用get
的时候就会抛出该异常. - ExecutionException
如果我们传进去的Runnable
在执行的过程中抛出了异常, 那么Future.get()
就会得到该异常, 可以从ExecutionException
中的getCause
得到抛出的异常的信息. - InterruptedException
因为该方法是阻塞的, 所以可能有等待过程. 如果在等待过程中, 当前线程被interrupt
了, 那么就会抛出该异常.
所以get()
方法主要是通过异常来通知用户任务的执行情况. 如果一切正常, 那么返回null
(当调用submit(Runnable)
时)或者我们传进去的某个结果(submit(Runnable, T result)
), 或者我们使用的是Callable
, 那么返回的就是任务的返回值.
实例:
public static void testSubmitRunnable2() throws InterruptedException, ExecutionException {
// 在Runnable中故意抛出一个异常
Runnable task = () -> {
throw new RuntimeException("This exception is deliberately thrown");
};
// 用于调用`submit(Runnable, result)方法
Boolean flag = true;
Future<Boolean> future = executorService.submit(task, flag);
// 调用Future.get()
try {
System.out.println("future.get()");
System.out.println(future.get());
} catch (ExecutionException e) {
System.out.println("The task threw a exception");
System.out.print("And the cause is: ");
System.out.println(e.getCause().getMessage());
}
// 成功后返回值为我们传进去的值
task = () -> {};
future = executorService.submit(task, flag);
System.out.println("return value from future.get(): " + future.get());
// 演示取消后task后, 再调用get方法
task = () -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
future = executorService.submit(task, flag);
// cancel的参数用于指定是否取消已经在执行的任务
future.cancel(true);
try {
future.get();
} catch (CancellationException e){
System.out.println("The task is cancelled");
}
// 超时的情况
task = () -> {
try {
Thread.sleep(10000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
future = executorService.submit(task, flag);
try {
future.get(1000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
System.out.println("the task is not finished within the given time");
// 这里也可以看到cancel(true)的效果
// 即使任务已经在执行了, 还是会通过interrupt去打断该任务
future.cancel(true);
}
}
输出:
future.get()
The task threw a exception
And the cause is: This exception is deliberately thrown
return value from future.get(): true
The task is cancelled
the task is not finished within the given time
java.lang.InterruptedException: sleep interrupted
使用Callable的情况
Callable
其实和Runnable
基本一致, 只是能够返回值. 而且ExecutorService
可以将返回值放入到Future
中, 我们可以通过get
方法得到.
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
AbstractExecutorService
中对三种方法签名的实现:
/**
* Returns a {@code RunnableFuture} for the given callable task.
*
* @param callable the callable task being wrapped
* @param <T> the type of the callable's result
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
newTaskFor
:
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @param <T> the type of the given value
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/**
* Returns a {@code RunnableFuture} for the given callable task.
*
* @param callable the callable task being wrapped
* @param <T> the type of the callable's result
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
FutureTask
:
/**
* A cancellable asynchronous computation. This class provides a base
* implementation of {@link Future}, with methods to start and cancel
* a computation, query to see if the computation is complete, and
* retrieve the result of the computation. The result can only be
* retrieved when the computation has completed; the {@code get}
* methods will block if the computation has not yet completed. Once
* the computation has completed, the computation cannot be restarted
* or cancelled (unless the computation is invoked using
* {@link #runAndReset}).
*
* <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
* {@link Runnable} object. Because {@code FutureTask} implements
* {@code Runnable}, a {@code FutureTask} can be submitted to an
* {@link Executor} for execution.
*
* <p>In addition to serving as a standalone class, this class provides
* {@code protected} functionality that may be useful when creating
* customized task classes.
*
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this FutureTask's {@code get} methods
*/
public class FutureTask<V> implements RunnableFuture<V>{...}
RunnableFuture
:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}