37.Python编程:多进程multiprocessing
前言
前面我们了解了计算机中的CPU
多任务原理,知道了进程和线程的概念。今天就通过代码看Python中多进程是如何实现的。
Linux\Unix多进程:os.fork()
Linux\Unix
操作系统提供了一个fork()
系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()
调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
fork()
返回结果:子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork
出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()
就可以拿到父进程的ID。
Python的os
模块封装了常见的系统调用,其中就包括fork
,可以在Python程序中轻松创建子进程:
import os
pid = os.fork()
if pid == 0:
print("这个是子进程,pid是{}, 父进程pid是{}".format(os.getpid(), os.getppid()))
else:
print("这个是父进程,pid是{}, 创建了个子进程pid是{}".format(os.getpid(), pid))
提示:os.fork()
只在Linux\Unix
系统下可以使用。Mac系统是基于Unix系统开发的,所以也支持这种写法。如果你是在Windows
系统下做的Python开发,那么运行上面代码,会报错:
Traceback (most recent call last):
File "F:/python_projects/multi_process/process_thread.py", line 2, in <module>
pid = os.fork()
AttributeError: module 'os' has no attribute 'fork'
由于Windows
没有fork
调用,但Python是支持跨平台的,所以Windows
上用Python编写多进程的程序,就需要借用multiprocessing
模块了。
跨平台多进程:multiprocessing
multiprocessing
模块提供了一个Process
类来代表一个进程对象。通常可导入模块:
from multiprocessing import Process
示例:
from multiprocessing import Process
from time import sleep
import os
# 定义函数
def my_run(arg):
print("子进程启动")
sleep(5)
print("子进程结束")
if __name__ == "__main__":
print("主进程已经开始启动:{}".format(os.getpid()))
# 创建子进程
p = Process(target=my_run, args=("子进程",))
# 启动子进程
p.start()
# join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步
p.join()
print("父进程结束")
运行结果:
主进程已经开始启动:26308
子进程启动
子进程结束
父进程结束
提示:上面例子中join()
方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。说白了就是,父进程的结束时间是要等子进程结束后才结束。如果不调用此方法,运行结果如下:
主进程已经开始启动:960
父进程结束
子进程启动
子进程结束
父进程的结束时间不再受子进程结束的影响。父进程可能在子进程结束之前就结束,也可能在子进程结束后才结束,具体情况要看各自进程的具体操作。本例中,如果不调用p.join()
方法,但子进程有耗时操作sleep(5)
,而父进程中没有耗时的操作,所以父进程优先于子进程结束了。
说明:
1,创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process
实例,用start()
方法启动,这样创建进程比fork()
还要简单。
2.创建子进程时,切不可这样创建:p = Process(target=my_run("子进程"))
。为了方便对比,将两种写法拿出来:
# 上面例子中的创建写法
p = Process(target=my_run, args=("子进程",))
# 不推荐的创建写法
p = Process(target=my_run("子进程"))
参数的传入方式不同,上面例子中通过args=
传入,形如:args=("子进程",)
,而错误的写法是直接通过指定target=
时就一起传入,形如:target=my_run("子进程")
。
错误的创建形式虽然可以创建子进程成功,但会导致:join()
方法无论是否调用,都会出现进程同步的现象。即:父进程的结束时间是要等子进程结束后才结束。
线程池Pool
如果要启动大量的子进程,可以用进程池的方式批量创建子进程。multiprocessing
模块提供了一个Pool
类来代表一个进程对象。通常可导入模块:
from multiprocessing import Pool
Pool
类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool
中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。
下面介绍一下multiprocessing
模块下的Pool
类下的几个方法:
1.apply()
函数原型:apply(func[, args=()[, kwds={}]])
该函数用于传递不定参数,同python中的apply函数一致,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不在出现)。
2.apply_async
函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])
与apply()
用法一致,但它是非阻塞的且支持结果返回后进行回调。
3.map()
函数原型:map(func, iterable[, chunksize=None])
Pool
类中的map
方法,与内置的map
函数用法行为基本一致,它会使进程阻塞直到结果返回。
注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
4.map_async()
函数原型:map_async(func, iterable[, chunksize[, callback]])
与map用法一致,但是它是非阻塞的。其有关事项见apply_async
。
5.close()
关闭进程池pool,使其不在接受新的任务。
6.terminal()
结束工作进程,不在处理未处理的任务。
7.join()
主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用。
下面我们看一个简单的multiprocessing.Pool
类的例子:
from multiprocessing import Pool
from time import time, sleep
import random
def perform_task(id):
begin = time()
print("【{}号】子进程【开始】执行任务".format(id))
sleep(random.random() * 5)
print("【{}号】子进程【结束】执行任务".format(id))
end = time()
cost = end - begin
print("【{}号】子进程执行任务耗时:{}".format(id, cost))
if __name__ == "__main__":
pool_count = 5
pool = Pool(pool_count)
print("进程池准备就绪")
print("多进程开始执行任务,等待结束...")
for i in range(pool_count):
pool.apply_async(perform_task, args=(i,))
pool.close()
pool.join()
print("所有进程池中的任务完成")
运行结果:
进程池准备就绪
多进程开始执行任务,等待结束...
【0号】子进程【开始】执行任务
【1号】子进程【开始】执行任务
【2号】子进程【开始】执行任务
【3号】子进程【开始】执行任务
【4号】子进程【开始】执行任务
【0号】子进程【结束】执行任务
【0号】子进程执行任务耗时:0.3483567237854004
【3号】子进程【结束】执行任务
【3号】子进程执行任务耗时:1.0760245323181152
【4号】子进程【结束】执行任务
【4号】子进程执行任务耗时:1.3666496276855469
【1号】子进程【结束】执行任务
【1号】子进程执行任务耗时:3.833340644836426
【2号】子进程【结束】执行任务
【2号】子进程执行任务耗时:4.259188890457153
所有进程池中的任务完成
对Pool对象调用join()
方法会等待所有子进程执行完毕,调用join()
之前必须先调用close()
,调用close()
之后就不能继续添加新的Process
了。
进程间通信
Process
之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing
模块包装了底层的机制,提供了Queue、Pipes
等多种方式来交换数据。
我们以Queue
为例,在父进程中创建两个子进程,一个往Queue
里写数据,一个从Queue
里读数据:
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
运行结果:
Process to write: 23192
Put A to queue...
Process to read: 2824
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
小结
本文主要学习了python中的多进程Process
及实现跨平台的多进程multiprocessing
,进程池Pool
、通过Queue
进程间通信。
更多了解,可关注公众号:人人懂编程
微信公众号:人人懂编程