Python 对ThreadPoolExecutor的简单封装

2022-11-04  本文已影响0人  Good_Nine9
from concurrent.futures import ThreadPoolExecutor
from threading import Lock

class MRTaskManager(object):
    def __init__(self, max_running, max_waiting):
        self._lock = Lock()
        self._num = 0
        self._max_waiting = max_waiting
        self._max_running = max_running
        self._pool = ThreadPoolExecutor(max_workers=max_running)

    def add(self, job):
        if self._num >= self._max_waiting:
            return
        try:
            self._lock.acquire()
            if self._num >= self._max_waiting:
                logger.info('job pool already reach max job count, abort job {}'.format(job))
                return
            self._num += 1
            self._pool.submit(job.run).add_done_callback(self.callback)
            logger.info('submit async mapreduce task, {}'.format(job))
        except Exception as e:
            logger.error("task register failed, err msg = {}".format(e.message))
        finally:
            self._lock.release()

    def callback(self, _):
        self._lock.acquire()
        self._num -= 1
        self._lock.release()

当线程池中任务超过max_waiting的时候,可以丢弃掉任务,防止在锁上排队的任务太多而挤爆内存?

上一篇 下一篇

猜你喜欢

热点阅读