ThreadPool 执行任务抛出异常如何处理

2020-08-18  本文已影响0人  我是许仙

ThreadPool 执行任务抛出异常

执行任务的方式

线程池执行任务有2中方式

  1. ThreadPool.execute(Runable task); 直接抛出异常。删除异常线程并创建一个新的。
  2. ThreadPool.submit(Callable task); ThreadPool.submit(Runable task); 需要 future.get()才能抛出异常。因为在ThreadPool内部没有抛出异常,所以线程不会停止也不会新增线程。

这2种方式都是可以执行任务,

线程池execute执行任务抛出异常 处理流程

runWorker代码

final void runWorker(Worker w) {
   completedAbruptly = true;
    ....
    try {
        while (task != null || (task = getTask()) != null) {
            ....
            try {
                Throwable thrown = null;
                try {
                    //1 执行run方法
                    task.run();
                } catch (RuntimeException x) {
                    //2 抛出异常
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //3 最终执行函数
        processWorkerExit(w, completedAbruptly);
    }
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
      if (completedAbruptly) 
        //1 cas操作核心线程数自减1 
        decrementWorkerCount();
        .....
         //2 删除抛出异常的线程
        workers.remove(w);
        ....
        //3  添加一个新的线程到工作列队 
        addWorker(null, false);
}

示例代码

public class ThreadPoolDemo  implements Runnable{

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new MyExecutePool(3,3,2,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        threadPoolExecutor.execute(new ThreadPoolDemo());
        threadPoolExecutor.execute(new ThreadPoolDemo());
        threadPoolExecutor.execute(new ThreadPoolDemo());
        threadPoolExecutor.execute(new ThreadPoolDemo());
    }
    @Override
    public void run() {
        test();
        System.out.println("执行结束");
    }


    private void test() {
        throw new RuntimeException(Thread.currentThread().getName() + "报错");
    }
}

线程运行初期有3个线程1,2,3,

006UamUGly1ghv73jyva1j30qg094ac2.png

当抛出异常后调用processWorkerExit()方法会在生成一个线程4

006UamUGly1ghv73bwy29j30s6084gnb.png

当线程3执行完抛出异常且被jvm回收,线程池有还有3个核心线程。线程4代替线程3工作。

006UamUGly1ghv72j7hhwj30qi0643zo.png image-20200817214436682.png

线程池submit任务抛出异常 处理流程

案例
 public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new MyExecutePool(3,3,2,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
       
        Future future = threadPoolExecutor.submit(new ThreadPoolDemo());
        Future future1 = threadPoolExecutor.submit(new ThreadPoolDemo());
        Future future2 = threadPoolExecutor.submit(new ThreadPoolDemo());

        try {
            future.get();
        } catch (ExecutionException e) {
            System.out.println(e.getMessage());
        }
    }

    @Override
    public void run() {
        test();
    }

    private void test() {
        String name = Thread.currentThread().getName();
        throw new RuntimeException( name+ "报错");
    }
006UamUGly1ghv77aywexj314w07iq3t.png

可以只有在future.get()的时候在会抛出异常,而且抛出异常的线程并没有因此关闭而是继续运行。可以任务FutureTask在ThreadPool的基础上帮我们做了异常捕捉

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = new FutureTask<T>(task, null);
    execute(ftask);
    return ftask;
}

可以看到这里对Runnable进行了封装,封装成了RunnableFuture的实现类FutureTask。

然后调用 execute(task);方法执行业务。源码中execute最终执行的是实现了Runable的run()方法的RunnableFuture对象。

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //1 执行业务逻辑
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //2 记录异常
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
       .....
    }
}

从代码中我们可以看出步骤1执行了业务逻辑的执行,而且用catch给捕捉了所以并不会抛去异常。再看步骤2

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

这里把返回值设置为异常Throwable并且把状态更新为EXCEPTIONAL

最终调用 futre.get() 会调用report()方法

@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}
上一篇 下一篇

猜你喜欢

热点阅读