multiprocessing进程池错误修复:TypeError

2020-02-14  本文已影响0人  越大大雨天

昨天在类中使用进程池时出现了TypeError: can't pickle generator objects的错误, 这是进程池无法序列化生成器对象的错误。

我尝试使用了concurrent.futures模块下的ProcessPoolExecutor进程池方法,依然是不行的。

这里使用以下代码大致还原一下遇到的错误:

from multiprocessing import Pool


class T:
    def __init__(self, a):
        self.a = a
        self.b = (i for i in range(5))

    def add(self, n):
        for i in self.b:
            print(i + n)

    def run(self):
        p = Pool()
        p.map(self.add, self.a)


if __name__ == '__main__':
    a = [i for i in range(5)]
    s = T(a)
    s.run()

当运行上述代码时,将会出现以下的具体错误:

Traceback (most recent call last):
  File "/Users/wangyue/Python/txt_to_MongoDB/mult_process.py", line 22, in <module>
    s.run()
  File "/Users/wangyue/Python/txt_to_MongoDB/mult_process.py", line 15, in run
    p.map(self.add, self.a)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 657, in get
    raise self._value
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks
    put(task)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle generator objects

问题分析

问题的直接原因是我在init构造函数内定义了一个生成器对象,不管你最终多进程的实例方法是否有用到该生成器对象,都会导致报错,因为它在构造函数内:

    def __init__(self, a):
        self.a = a
        # 这是一个生成器对象
        self.b = (i for i in range(5)

由于我使用进程池调用的方法是一个类对象中中的实例方法,而进程池无法pickle序列化该self对象,造成错误。

解决方案

若要对类实例方法调用进程池,则不能在_init_中定义生成器对象,你可以转换为一个列表。
若一定需要一个生成器对象,则可以在_init_之外使用@property定义一个属性,其余代码也均不需要做修改:

from multiprocessing import Pool


class T:
    def __init__(self, a):
        self.a = a
        # 修改方式1:将对象转换为列表
        # self.b = [i for i in range(5)]

    # 修改方式2:定义属性方法获取生成器对象
    @property
    def b(self):
        return (i for i in range(5))

    def add(self, n):
        for i in self.b:
            print(i + n)

    def run(self):
        p = Pool()
        p.map(self.add, self.a)


if __name__ == '__main__':
    a = [i for i in range(5)]
    s = T(a)
    s.run()

修改后的多进程代码可以正常运行了。

其余修改方式:

貌似可以使用dill模块实现pickle模块无法序列化的一些对象,不知道是否包括生成器对象。不过目前没做研究,有兴趣的小伙伴可以再研究研究。

希望能帮到你。

上一篇 下一篇

猜你喜欢

热点阅读