python 多进程(一)multiprocessing.Pro
该文章基于 python3.7,部分功能只有python3.7能实现
一、 进程模块multiprocessing
多进程可以实现多个程序的并行,充分利用计算机的资源,在不同的平台/操作系统上,python
实现多进程的方式不同
在Unix/Linux 中,通过fork()
调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()
调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
运行结果如下:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
由于Windows没有fork调用,上面的代码在Windows上无法运行,我们可以使用multiprocessing
模块,其封装了底层复制进程的过程,Unix 和 Windows 上都可以运行。
根据不同的平台, multiprocessing
支持三种启动进程的方法。
-
spawn
父进程启动一个新的Python解释器进程。子进程只会继承那些运行进程对象的run()
方法所需的资源。特别是父进程中非必须的文件描述符和句柄不会被继承。相对于使用 fork 或者 forkserver,使用这个方法启动进程相当慢。
可在Unix和Windows上使用。 Windows上的默认设置。 -
fork
父进程使用os.fork()
来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。
只存在于Unix。Unix中的默认值。
- forkserver
程序启动并选择* forkserver * 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用os.fork()
是安全的。没有不必要的资源被继承。
可在Unix平台上使用,支持通过Unix管道传递文件描述符。
二、进程对象Process
进程模块multiprocessing
中包含与进程相关的异常、同步、通信等等相关,其中Process
封装了进程对象的相关API,是一个子进程的物化实现,封装了子进程状态与管理相关功能。
- 如何创建一个子进程对象
Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
应始终使用关键字参数调用构造函数,而不是 None
, None
, p1
……这样来传入参数,否则可能造成不可知错误。
-
group 应该始终是
None
;它仅用于兼容性考虑 -
target 传入一个可调用对象。它默认为
None
,这里传入的是子进程的运行函数。 - name 是进程名称,仅仅具有标识作用,并不会改变操作系统中的进程名称。
-
args 是目标调用的参数元组,也就是
target
调用函数的参数 -
kwargs 是目标调用的关键字参数字典,也是
target
调用函数的参数 -
daemon 将进程
daemon
标志设置为True
或False
。如果是None
(默认值),则该标志将从创建的进程继承
示例:创建子进程,并显示子进程和父进程的的进程ID
from multiprocessing import Process
import os
def child_main(name):
print('I am', name, 'process id:', os.getpid())
print('parent process:', os.getppid())
if __name__ == '__main__':
print('main process id:', os.getpid())
p = Process(target=child_main, args=('bob',))
p.start()
p.join()
打印结果
main process id: 2649
I am bob process id: 2650
parent process: 2649
从程序运行来看
p = Process(target=child_main, args=('bob',))
创建了一个子进程,该子进程执行child_main
方法,方法的参数为'bob'
,p
为该子进程对象的物化实现,封装子进程相关的功能。
-
Process
对象的常用方法
-
start()
启动进程,并调用run()
方法。 -
run()
表示进程活动的方法,在可以在子类中重载此方法。标准run()
方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),分别从 args 和 kwargs 参数中获取顺序和关键字参数。 -
join([timeout])
如果可选参数 timeout 是None
(默认值),则该方法将阻塞,直到调用join()
方法的进程终止。如果 timeout 是一个正数,它最多会阻塞 timeout 秒。请注意,如果进程终止或方法超时,则该方法返回None
。检查进程的exitcode
以确定它是否终止。 -
name
进程的名称。该名称是一个字符串,仅用于识别目的。它没有语义。 -
is_alive()
返回进程是否还活着。
粗略地说,从start()
方法返回到子进程终止之前,进程对象仍处于活动状态。 -
daemon
进程的守护标志,一个布尔值。这必须在start()
被调用之前设置。
当进程退出时,它会尝试终止其所有守护进程子进程。
请注意,不允许守护进程创建子进程。否则,守护进程会在子进程退出时终止其子进程。 另外,这些 不是 Unix守护进程或服务,它们是正常进程,如果非守护进程已经退出,它们将被终止(并且不被合并)。 -
pid
返回进程ID。在生成该进程之前,这将是 None 。 -
exitcode
子进程退出代码。如果进程尚未终止,这将是 None 。负值 -N 表示孩子被信号 N 终止。 -
terminate()
终止进程。 在Unix上,这是使用SIGTERM
信号完成的;在Windows上使用 TerminateProcess() 。 请注意,不会执行退出处理程序和finally子句等
请注意,进程的后代进程将不会被终止 —— 它们将孤儿进程。参考进程基础 -
kill()
与terminate()
相同,但在Unix上使用SIGKILL
信号
3.7 新版功能. -
close()
关闭Process
对象,释放与之关联的所有资源。如果底层进程仍在运行,则会引发ValueError
。一旦close()
成功返回,Process
对象的大多数其他方法和属性将引发ValueError
。
3.7 新版功能.
注意 start()
、 join()
、 is_alive()
、 terminate()
和 exitcode
方法只能由创建进程对象的进程调用。
使用实例 1,利用多进程实现timeout
函数
在使用 爬虫 的相关技术中,有很多方法都具有 timeout
参数,也可以利用多进程实现timeout
函数,思路如下:
将运行函数放到子进程中运行,在主进程中等待子进程执行join([timeout])
,然后判断子进程的状态 is_alive()
,如果为真,说明子进程还在运行,已经超过我们的限制时间,则打断
子进程,并在主进程中抛出异常。
实现如下:
这里在函数 run_limit()
中包含一个辅助函数,我们也可以将辅助函数作为一个参数,可以实现对所有函数转成timeout
函数
from multiprocessing import Process
import time
def run_limit(timeout=5):
def fun():
i = 0
while True:
time.sleep(1)
i += 1
print(i)
p = Process(target=fun, )
p.start()
p.join(timeout=timeout)
if p.is_alive():
p.terminate()
raise TimeoutError(f'运行超时{timeout}!')
if __name__ == '__main__':
run_limit()
注意,这里每执行该函数都会启动一个子进程,较普通函数有较大的消耗。
使用实例 2,利用守护进程daemon
实现心跳机制
背景:
心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。
在Linux系统中,计算机刚启动时只有一个进程,PID为1,名字为init
(centos6系统)或者systemd
(centos7系统),systemd
进程通过复制自身进程启动了其它进程,后面再复制出整个计算机进程,所以进程被设计成独立的,也就是说父进程关闭了,子进程照样能正常运行,这是非常必要的,不至于其中一个进程挂了,让它的子孙后代进程都挂掉。
但如果我们在主进程中开启一个子进程用于向远方服务器报告,这种进程间的独立就不符合我们的期望,而daemon
参数可以实现主进程与子进程的绑定,主进程结束,守护进程的子进程
也结束,如下
from multiprocessing import Process
import time
def child_main():
while True:
print('i am live')
time.sleep(3)
if __name__ == '__main__':
p = Process(target=child_main, daemon=True)
p.start()
time.sleep(10)
print('main end')
这里没有使用join()
方法等待子进程,所以在运行10秒后主进程结束,
由于参数daemon=True
,所以子进程在主进程结束后也跟着结束了。