Python ThreadPoolExecutor 异常中止解决

2022-06-10  本文已影响0人  Simple丶Plan

1. 原始方案

from concurrent.futures import ThreadPoolExecutor
import time
import threading
import random


def worker(n):
    threading_name = threading.current_thread().name
    print(f'[{threading_name}] {n} start')
    time.sleep(random.randint(1, 6))  # 随机休眠1-6s
    print(f'[{threading_name}] {n} end')

def loop_worker():
    threadPool = ThreadPoolExecutor(max_workers=3)
    for q in range(20):
        threadPool.submit(worker, q)
    threadPool.shutdown(wait=True)

if __name__ in '__main__':
    loop_worker()
[ThreadPoolExecutor-1_0] 0 start
[ThreadPoolExecutor-1_1] 1 start
[ThreadPoolExecutor-1_2] 2 start
[ThreadPoolExecutor-1_0] 0 end
[ThreadPoolExecutor-1_0] 3 start
[ThreadPoolExecutor-1_1] 1 end
[ThreadPoolExecutor-1_0] 3 end
[ThreadPoolExecutor-1_0] 4 start
[ThreadPoolExecutor-1_1] 5 start
[ThreadPoolExecutor-1_2] 2 end
[ThreadPoolExecutor-1_2] 6 start
[ThreadPoolExecutor-1_0] 4 end
[ThreadPoolExecutor-1_0] 7 start
[ThreadPoolExecutor-1_1] 5 end
[ThreadPoolExecutor-1_2] 6 end
[ThreadPoolExecutor-1_0] 7 end
......

通常情况,我们利用 Ctrl+C 让程序触发 KeyboardInterrupt 异常,中止程序运行。线程池方案下,Ctrl-C 失效,当线程池里的线程任务跑完后,才会触发 KeyboardInterrupt

1.1 上下文管理器协议格式

def loop_worker():
    with ThreadPoolExecutor(max_workers=3) as executor:
        for q in range(20):
            executor.submit(worker, q)

上下文管理协议相当于隐性地省略了 threadPool.shutdown(wait=True),同时,程序正常执行完成或出现异常中断的时候,就会调用 __exit__() 方法,接下来进行异常中止的基础。

2. 以全局变量或事务为标识进行判断

适用于 Django 等 WEB 应用框架,本身自带多线程,修改全局变量简单,但要注意线程安全。

from concurrent.futures import ThreadPoolExecutor
import time
import threading
import random

sign = 0
exiting = threading.Event()

def worker(n):
    threading_name = threading.current_thread().name
    if sign == 1 or exiting.is_set():
        # 满足则直接跳过主程序
        print(f'[{threading_name}] {n} skip')
        return 
    print(f'[{threading_name}] {n} start')
    time.sleep(random.randint(1, 6))  # 随机休眠1-6s
    print(f'[{threading_name}] {n} end')

def loop_worker():
    with ThreadPoolExecutor(max_workers=3) as executor:
        for q in range(20):
            executor.submit(worker, q)

if __name__ in '__main__':
    loop_worker()

程序运行中,只需 sign = 1 或者 exiting.set() ,worker 函数则跳过主要运算部分,剩余线程任务将迅速完成,变相达到中止多线程任务的目的。

[ThreadPoolExecutor-1_0] 0 start
[ThreadPoolExecutor-1_1] 1 start
[ThreadPoolExecutor-1_2] 2 start
[ThreadPoolExecutor-1_0] 0 end
[ThreadPoolExecutor-1_0] 3 start
[ThreadPoolExecutor-1_1] 1 end
[ThreadPoolExecutor-1_0] 3 end
[ThreadPoolExecutor-1_0] 4 start
[ThreadPoolExecutor-1_2] 2 end
[ThreadPoolExecutor-1_0] 4 end
[ThreadPoolExecutor-1_1] 5 skip
[ThreadPoolExecutor-1_2] 6 skip
......
[ThreadPoolExecutor-1_0] 18 skip
[ThreadPoolExecutor-1_2] 17 skip
[ThreadPoolExecutor-1_1] 19 skip

3. 接收 KeyboardInterrupt 异常取消线程任务

from concurrent.futures import ThreadPoolExecutor
import time
import threading
import random

def worker(n):
    threading_name = threading.current_thread().name
    print(f'[{threading_name}] {n} start')
    time.sleep(random.randint(1, 6))  # 随机休眠1-6s
    print(f'[{threading_name}] {n} end')

with ThreadPoolExecutor(max_workers=6) as executor:
    threadPool = []
    for q in range(20):
        task = executor.submit(worker, q)
        threadPool.append(task)  # task 任务加入 threadPool
    try:
        while not list(reversed(threadPool))[0].done():  # 判断最后一个任务是否取消/完成
            # 代替 wait(threadPool, return_when=FIRST_EXCEPTION)
            # 利用 while 堵塞且能够接收 KeyboardInterrupt 异常
            time.sleep(2)
    except KeyboardInterrupt:
        # 接收 KeyboardInterrupt 并取消剩余线程任务
        print('KeyboardInterrupt')
        for task in reversed(threadPool):
            task.cancel()

提交给线程池的每个线程任务 task 加入 threadPool中,方便后续对 task 进行操作。当 for 循环内的 task 全部提交后,线程会再后台运行,而进程运行至 while 中堵塞,直至 threadPool 中最后一个线程是否 .done()。若进程堵塞在 while 中接收到 Ctrl+CKeyboardInterrupt 异常,则从后往前取消 threadPool 中所有任务,达到中止目的。

[ThreadPoolExecutor-0_0] 0 start
[ThreadPoolExecutor-0_1] 1 start
[ThreadPoolExecutor-0_2] 2 start
[ThreadPoolExecutor-0_1] 1 end
[ThreadPoolExecutor-0_1] 3 start
[ThreadPoolExecutor-0_0] 0 end
[ThreadPoolExecutor-0_0] 4 start
[ThreadPoolExecutor-0_1] 3 end
[ThreadPoolExecutor-0_1] 5 start
KeyboardInterrupt
[ThreadPoolExecutor-0_2] 2 end
[ThreadPoolExecutor-0_0] 4 end
[ThreadPoolExecutor-0_1] 5 end
上一篇下一篇

猜你喜欢

热点阅读