python 多进程

2020-09-10  本文已影响0人  天空蓝雨

基于官方文档:
https://docs.python.org/zh-cn/3/library/multiprocessing.html
日乐购,刚才看到的一个博客,写的都不太对,还是基于官方的比较稳妥
我就是喜欢抄官方的,哈哈

通常我们使用Process实例化一个进程,并调用 他的 start() 方法启动它。
这种方法和 Thread 是一样的。

from multiprocessing import Process
def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('傻逼',))
    p.start()
    p.join()
    print("运行结束")
>> hello 傻逼
运行结束

上图中,我写了 p.join() 所以主进程是 等待 子进程执行完后,才执行 print("运行结束")
否则就是反过来了(这个不一定,看你的语句了,顺序其实是随机的)例如:

p = Process(target=f, args=('傻逼',))
    p.start()
    # time.sleep(1)
    print("运行结束")

>> 运行结束
hello 傻逼

主进加个 sleep

结果:
hello 傻逼 
运行结束 (大约等了1秒后,才输出 运行结束)

所以不加join() ,其实子进程和主进程是各干各的,谁也不等谁。都执行完后,文件运行就结束了

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  #  获取父进程的 pid  (process id)
    print('process id:', os.getpid())


def f(name):
    info('function f') # use info
    print('hello', name)


if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

>> main line
module name: __main__
parent process: 14212
process id: 1940
function f
module name: __mp_main__
parent process: 1940
process id: 15020
hello bob

上面我们用了 os.getpid() 和 os.getppid() 获取 当前进程,和父进程的id
下面就讲一下,这两个函数的用法:
os.getpid()
返回当前进程的id
os.getppid()
返回父进程的id。 父进程退出后,unix 返回初始化进程(1)中的一个
windows返回相同的id (可能被其他进程使用了)
这也就解释了,为啥我上面 的程序运行多次, 第一次打印的parentid 都是 14212 了。
而子进程的父级 process id 是调用他的那个进程的 id : 1940

视频笔记:
多进程:使用大致方法:

参考:进程通信(pipe和queue)

pool.map (函数可以有return 也可以共享内存或queue) 结果直接是个列表

def b(x):
    # return 20
    print("child process")
if __name__ == "__main__":
    p  =Pool(processes=2)
    print(p.map(b, [1,2,3,4]))
>>
child process
child process
child process
child process
[None, None, None, None]

poll.apply_async() (同map,只不过是一个进程,返回结果用 xx.get() 获得)

Pipe() 函数返回一个由管道连接的连接对象

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send()recv() 方法

练习过程中出现的问题:
from multiprocessing import Pool
pool = Pool()
def search(a):

    # while a<1000000:
    #     a+=1
    print("子进程结束")
    return 1
if __name__ == "__main__":
    # pool = Pool()
    a = time.time()
    result = pool.map(search, [1,2,10])
    b = time.time()
    print(b-a)

报错:

RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...
        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

参考 :https://blog.csdn.net/xiemanR/article/details/71700531

把 pool = Pool() 放到 if name == "main": 下面初始化搞定。
结果:

子进程结束
子进程结束
子进程结束
0.7355780601501465

这个肯定有解释的


spawn.py 的解释

测试多进程计算效果:
进程池运行:

class aa():

    def search(self, a):

        while a<100000000:
            a+=1
        print("子进程结束")
        return 1
if __name__ == "__main__":
    pool = Pool()
    c = aa()
    a = time.time()
    result = pool.map(c.search, [1,2,10])
    b = time.time()
    print(b-a)

结果:

子进程结束
子进程结束
子进程结束
16.176724195480347
红框是程序运行的状态

普通计算:

class aa():

    def search(self, a):

        while a<100000000:
            a+=1
        print("子进程结束")
        return 1
if __name__ == "__main__":
    pool = Pool()
    c = aa()
    a = time.time()
    # result = pool.map(c.search, [1,2,10])
    c.search(1)
    c.search(2)
    c.search(10)

    b = time.time()
    print(b-a)

我们同样传入 1 2 10 三个参数测试:

子进程结束
子进程结束
子进程结束
24.153281211853027
红框为执行计算的过程,看出来很平稳

其实对比下来开始快了一半的;
我们把循环里的数字去掉一个 0;
单进程:

子进程结束
子进程结束
子进程结束
3.1451985836029053

多进程:

子进程结束
子进程结束
子进程结束
2.4246084690093994

两次测试 单进程/进程池 分别为 0.669 和 0.772 几乎成正比的。
问题 二:
视图:
post 视图里面

class xxviews():
    def  post(request):
        music = Music()
        result = music.run()
    return ...

Music 类:

pool = Pool()
class Music():
        def __init__(self):
                pass
        def run(self):
                result  = self.search(...)
                pass
        def search(self, keyword, target_src):
             pool.map(....)     

直接报错:


好像是说 子进程自己不可以在创建子进程了

写在 类里面也 在函数里用 self.pool 调用也不行,也是相同的错误。

最后 把 pool = Pool 直接写在 search 函数里面,奇迹出现了:


class Music():
        def __init__(self):
                pass
        def run(self):
                result  = self.search(...)
                pass
        def search(self, keyword, target_src):
              pool = Pool()      
              pool.map(....)       
              pass
成功运行了

前台也能显示搜索的音乐结果了


总结一点,进程这个东西,最好 写在 直接运行的函数里面,而不是 一个函数跳来跳去。因为最后可能 是在子进程的子进程运行的,这是不许的,会报错。
还有一点,多进程运行的函数对象,不能是 lambda 函数。也许lambda 虚拟,在内存??

使用 pool.map 子进程 函数报错,导致整个 pool 挂了:
参考:https://blog.csdn.net/hedongho/article/details/79139606
主要你要,对函数内部捕获错误,而不能让异常抛出就可以了。

关于map 传多个函数参数
我一开始,就是正常思维,多个参数,搞个元祖,让参数一一对应不就行了:

class aa():

    def search(self, a,b):

        while a<10000000:
            a+=1
        print("子进程结束")
        return 1
if __name__ == "__main__":
    pool = Pool()
    c = aa()
    a = time.time()
    result = pool.map(c.search, ((1,3),))

报错:

   return self._map_async(func, iterable, mapstar, chunksize).get()
  File "E:\Program\python\lib\multiprocessing\pool.py", line 657, in get
    raise self._value
TypeError: search() missing 1 required positional argument: 'b'

参考:
https://blog.csdn.net/qq_15969343/article/details/84672527
普通的 process 当让可以穿多个参数,map 却不知道咋传的。
apply_async 和map 一样,不知道咋传的。

最简单的方法:
使用 starmap 而不是 map

class aa():

    def search(self, a,b):

        while a<10000000:
            a+=1
        print("子进程结束")
        return 1
if __name__ == "__main__":
    pool = Pool()
    c = aa()
    a = time.time()
    result = pool.starmap(c.search, ((1,3),))
    b = time.time()
    print(b-a)

结果:
子进程结束
1.8399453163146973
成功拿到结果了

关于map 和 starmap 不同的地方看源码:

def mapstar(args):
    return list(map(*args))

def starmapstar(args):
    return list(itertools.starmap(args[0], args[1]))

def map(self, func, iterable, chunksize=None):
        '''
        Apply `func` to each element in `iterable`, collecting the results
        in a list that is returned.
        '''
        return self._map_async(func, iterable, mapstar, chunksize).get()

def starmap(self, func, iterable, chunksize=None):
        '''
        Like `map()` method but the elements of the `iterable` are 
expected to
        be iterables as well and will be unpacked as arguments. Hence
        `func` and (a, b) becomes func(a, b).
        '''
        return self._map_async(func, iterable, starmapstar, chunksize).get()

关于apply_async() ,我没找到多参数的方法,大不了用 一个迭代的 starmap 实现。哈哈

关于 上面源码里面有 itertools.starmap
itertools 用法参考:
https://docs.python.org/zh-cn/3/library/itertools.html#itertool-functions

有个问题,多进程最好不要使用全部的 cpu , 因为这样可能影响其他任务,所以 在进程池 添加 process 参数 指定,cpu 个数:

Pool(processes=os.cpu_count()-1)

上面就是预留了 一个cpu 干其他事的

后面直接使用 Queue 遇到这个问题:

RuntimeError: Queue objects should only be shared between processes through inheritance

解决:
Manager().Queue() 代替 Queue()

from multiprocessing import Manager, Queue
queue = Manager().Queue()
#  queue =   Queue()

因为 queue.get() 是堵塞型的,所以可以提前判断是不是 空的,以免堵塞进程。比如下面这样:
使用 queue.empty() 空为True

 current_search = "稍等" if music_current_search.empty() else 
music_current_search.get()
上一篇下一篇

猜你喜欢

热点阅读