Python3用multiprocessing简单实现多进程并行

2018-10-10  本文已影响0人  陈葡萄

1、简单例子代码

# -*- coding: utf-8 -*-

import multiprocessing
from multiprocessing import Pool, Queue

# 全局Pool方便在已产生结果数据时中止进程
global MP_POOL

# class的方法须放在top级函数中间接调用
def parallel_test(i):
    tt = TT()
    return tt.test(i)

# 供并行进程异步回调的函数
def async_callback(d):
    global MP_POOL
    print(">>>>>>>>>>>>>callback d: {0}<<<<<<<<<<<<<<".format(d))
    if d == 13:
        print("=============terminate pool=============")
        MP_POOL.terminate()

# 被并行调用的class
class TT(object):
    def __init__(self):
        pass
    
    def test(self, i):
        print("----------------in TT.test i: {0}----------------".format(i))
        return i


class SimpleParallel(object):
    def __init__(self):
        self.myqueue = Queue()

    def run(self):
        self.myqueue.put(10)
        self.myqueue.put(15)
        self.myqueue.put(13)
        self.myqueue.put(20)
        self.myqueue.put(21)
        self.myqueue.put(30)
        self.myqueue.put(11)
        self.myqueue.put(19)
        self.myqueue.put(25)

        try:
            self.async_track()
        except Exception as e:
            pass

    def async_track(self):
        global MP_POOL
        # Pool不指定processes参数,默认也会用cpu_count初始化
        MP_POOL = Pool(processes=multiprocessing.cpu_count())
        while not self.myqueue.empty():
            i = self.myqueue.get()
            # apply_async的func参数只能传递top级module中定义的函数
            MP_POOL.apply_async(parallel_test, (i, ), callback=async_callback)

        # Pool的close调用后,就不能再加入更多的进程了
        MP_POOL.close()
        # Pool的join阻塞主进程,直到Pool中的进程全部结束
        MP_POOL.join()


if __name__ == "__main__":
    print("start...")

    stt = SimpleParallel()
    stt.run()

    print("...done")

2、输出结果:
多次运行,输出可能会有不同

start...
----------------in TT.test i: 10----------------
----------------in TT.test i: 15----------------
>>>>>>>>>>>>>callback d: 10<<<<<<<<<<<<<<
----------------in TT.test i: 13----------------
>>>>>>>>>>>>>callback d: 15<<<<<<<<<<<<<<
>>>>>>>>>>>>>callback d: 13<<<<<<<<<<<<<<
----------------in TT.test i: 21----------------
=============terminate pool=============
----------------in TT.test i: 20----------------
...done

3、踩到的坑

修改~/.bash_profile添加环境变量
vi ~/.bash_profile

export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES

source ~/.bash_profile

修改launch.json添加环境变量:

"env": {
    "OBJC_DISABLE_INITIALIZE_FORK_SAFETY": "YES"
}

自定义class的方法需要转换成支持pickle的对象,或者放在top级函数中间接调用(比如例子中tt.test)

4、参考资料
https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool
https://blog.csdn.net/u010483897/article/details/82222743
https://blog.csdn.net/wen61995120/article/details/80319077
https://blog.csdn.net/u010483897/article/details/82221340
http://sealiesoftware.com/blog/archive/2017/6/5/Objective-C_and_fork_in_macOS_1013.html
https://blog.csdn.net/dutsoft/article/details/70336462
https://stackoverflow.com/questions/1816958/cant-pickle-type-instancemethod-when-using-multiprocessing-pool-map

上一篇下一篇

猜你喜欢

热点阅读