Python ThreadPoolExecutor 异常中止解决
2022-06-10 本文已影响0人
Simple丶Plan
1. 原始方案
from concurrent.futures import ThreadPoolExecutor
import time
import threading
import random
def worker(n):
threading_name = threading.current_thread().name
print(f'[{threading_name}] {n} start')
time.sleep(random.randint(1, 6)) # 随机休眠1-6s
print(f'[{threading_name}] {n} end')
def loop_worker():
threadPool = ThreadPoolExecutor(max_workers=3)
for q in range(20):
threadPool.submit(worker, q)
threadPool.shutdown(wait=True)
if __name__ in '__main__':
loop_worker()
[ThreadPoolExecutor-1_0] 0 start
[ThreadPoolExecutor-1_1] 1 start
[ThreadPoolExecutor-1_2] 2 start
[ThreadPoolExecutor-1_0] 0 end
[ThreadPoolExecutor-1_0] 3 start
[ThreadPoolExecutor-1_1] 1 end
[ThreadPoolExecutor-1_0] 3 end
[ThreadPoolExecutor-1_0] 4 start
[ThreadPoolExecutor-1_1] 5 start
[ThreadPoolExecutor-1_2] 2 end
[ThreadPoolExecutor-1_2] 6 start
[ThreadPoolExecutor-1_0] 4 end
[ThreadPoolExecutor-1_0] 7 start
[ThreadPoolExecutor-1_1] 5 end
[ThreadPoolExecutor-1_2] 6 end
[ThreadPoolExecutor-1_0] 7 end
......
通常情况,我们利用 Ctrl+C
让程序触发 KeyboardInterrupt
异常,中止程序运行。线程池方案下,Ctrl-C
失效,当线程池里的线程任务跑完后,才会触发 KeyboardInterrupt
。
1.1 上下文管理器协议格式
def loop_worker():
with ThreadPoolExecutor(max_workers=3) as executor:
for q in range(20):
executor.submit(worker, q)
上下文管理协议相当于隐性地省略了 threadPool.shutdown(wait=True)
,同时,程序正常执行完成或出现异常中断的时候,就会调用 __exit__()
方法,接下来进行异常中止的基础。
2. 以全局变量或事务为标识进行判断
适用于 Django 等 WEB 应用框架,本身自带多线程,修改全局变量简单,但要注意线程安全。
from concurrent.futures import ThreadPoolExecutor
import time
import threading
import random
sign = 0
exiting = threading.Event()
def worker(n):
threading_name = threading.current_thread().name
if sign == 1 or exiting.is_set():
# 满足则直接跳过主程序
print(f'[{threading_name}] {n} skip')
return
print(f'[{threading_name}] {n} start')
time.sleep(random.randint(1, 6)) # 随机休眠1-6s
print(f'[{threading_name}] {n} end')
def loop_worker():
with ThreadPoolExecutor(max_workers=3) as executor:
for q in range(20):
executor.submit(worker, q)
if __name__ in '__main__':
loop_worker()
程序运行中,只需 sign = 1
或者 exiting.set()
,worker 函数则跳过主要运算部分,剩余线程任务将迅速完成,变相达到中止多线程任务的目的。
[ThreadPoolExecutor-1_0] 0 start
[ThreadPoolExecutor-1_1] 1 start
[ThreadPoolExecutor-1_2] 2 start
[ThreadPoolExecutor-1_0] 0 end
[ThreadPoolExecutor-1_0] 3 start
[ThreadPoolExecutor-1_1] 1 end
[ThreadPoolExecutor-1_0] 3 end
[ThreadPoolExecutor-1_0] 4 start
[ThreadPoolExecutor-1_2] 2 end
[ThreadPoolExecutor-1_0] 4 end
[ThreadPoolExecutor-1_1] 5 skip
[ThreadPoolExecutor-1_2] 6 skip
......
[ThreadPoolExecutor-1_0] 18 skip
[ThreadPoolExecutor-1_2] 17 skip
[ThreadPoolExecutor-1_1] 19 skip
3. 接收 KeyboardInterrupt 异常取消线程任务
from concurrent.futures import ThreadPoolExecutor
import time
import threading
import random
def worker(n):
threading_name = threading.current_thread().name
print(f'[{threading_name}] {n} start')
time.sleep(random.randint(1, 6)) # 随机休眠1-6s
print(f'[{threading_name}] {n} end')
with ThreadPoolExecutor(max_workers=6) as executor:
threadPool = []
for q in range(20):
task = executor.submit(worker, q)
threadPool.append(task) # task 任务加入 threadPool
try:
while not list(reversed(threadPool))[0].done(): # 判断最后一个任务是否取消/完成
# 代替 wait(threadPool, return_when=FIRST_EXCEPTION)
# 利用 while 堵塞且能够接收 KeyboardInterrupt 异常
time.sleep(2)
except KeyboardInterrupt:
# 接收 KeyboardInterrupt 并取消剩余线程任务
print('KeyboardInterrupt')
for task in reversed(threadPool):
task.cancel()
提交给线程池的每个线程任务 task
加入 threadPool
中,方便后续对 task 进行操作。当 for
循环内的 task
全部提交后,线程会再后台运行,而进程运行至 while
中堵塞,直至 threadPool
中最后一个线程是否 .done()
。若进程堵塞在 while
中接收到 Ctrl+C
的 KeyboardInterrupt
异常,则从后往前取消 threadPool
中所有任务,达到中止目的。
[ThreadPoolExecutor-0_0] 0 start
[ThreadPoolExecutor-0_1] 1 start
[ThreadPoolExecutor-0_2] 2 start
[ThreadPoolExecutor-0_1] 1 end
[ThreadPoolExecutor-0_1] 3 start
[ThreadPoolExecutor-0_0] 0 end
[ThreadPoolExecutor-0_0] 4 start
[ThreadPoolExecutor-0_1] 3 end
[ThreadPoolExecutor-0_1] 5 start
KeyboardInterrupt
[ThreadPoolExecutor-0_2] 2 end
[ThreadPoolExecutor-0_0] 4 end
[ThreadPoolExecutor-0_1] 5 end