ThreadPool 执行任务抛出异常如何处理
2020-08-18 本文已影响0人
我是许仙
ThreadPool 执行任务抛出异常
执行任务的方式
线程池执行任务有2中方式
- ThreadPool.execute(Runable task); 直接抛出异常。删除异常线程并创建一个新的。
- ThreadPool.submit(Callable task); ThreadPool.submit(Runable task); 需要 future.get()才能抛出异常。因为在ThreadPool内部没有抛出异常,所以线程不会停止也不会新增线程。
这2种方式都是可以执行任务,
线程池execute执行任务抛出异常 处理流程
runWorker代码
final void runWorker(Worker w) {
completedAbruptly = true;
....
try {
while (task != null || (task = getTask()) != null) {
....
try {
Throwable thrown = null;
try {
//1 执行run方法
task.run();
} catch (RuntimeException x) {
//2 抛出异常
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//3 最终执行函数
processWorkerExit(w, completedAbruptly);
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
//1 cas操作核心线程数自减1
decrementWorkerCount();
.....
//2 删除抛出异常的线程
workers.remove(w);
....
//3 添加一个新的线程到工作列队
addWorker(null, false);
}
示例代码
public class ThreadPoolDemo implements Runnable{
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new MyExecutePool(3,3,2,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
threadPoolExecutor.execute(new ThreadPoolDemo());
threadPoolExecutor.execute(new ThreadPoolDemo());
threadPoolExecutor.execute(new ThreadPoolDemo());
threadPoolExecutor.execute(new ThreadPoolDemo());
}
@Override
public void run() {
test();
System.out.println("执行结束");
}
private void test() {
throw new RuntimeException(Thread.currentThread().getName() + "报错");
}
}
线程运行初期有3个线程1,2,3,

当抛出异常后调用processWorkerExit()方法会在生成一个线程4

当线程3执行完抛出异常且被jvm回收,线程池有还有3个核心线程。线程4代替线程3工作。


线程池submit任务抛出异常 处理流程
案例
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new MyExecutePool(3,3,2,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
Future future = threadPoolExecutor.submit(new ThreadPoolDemo());
Future future1 = threadPoolExecutor.submit(new ThreadPoolDemo());
Future future2 = threadPoolExecutor.submit(new ThreadPoolDemo());
try {
future.get();
} catch (ExecutionException e) {
System.out.println(e.getMessage());
}
}
@Override
public void run() {
test();
}
private void test() {
String name = Thread.currentThread().getName();
throw new RuntimeException( name+ "报错");
}

可以只有在future.get()的时候在会抛出异常,而且抛出异常的线程并没有因此关闭而是继续运行。可以任务FutureTask在ThreadPool的基础上帮我们做了异常捕捉
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = new FutureTask<T>(task, null);
execute(ftask);
return ftask;
}
可以看到这里对Runnable进行了封装,封装成了RunnableFuture的实现类FutureTask。
然后调用 execute(task);方法执行业务。源码中execute最终执行的是实现了Runable的run()方法的RunnableFuture对象。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//1 执行业务逻辑
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//2 记录异常
setException(ex);
}
if (ran)
set(result);
}
} finally {
.....
}
}
从代码中我们可以看出步骤1执行了业务逻辑的执行,而且用catch给捕捉了所以并不会抛去异常。再看步骤2
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
这里把返回值设置为异常Throwable并且把状态更新为EXCEPTIONAL
最终调用 futre.get() 会调用report()方法
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}