Python开发(人工智能/大数据/机器学习)

37.Python编程:多进程multiprocessing

2018-08-10  本文已影响9人  TensorFlow开发者

前言

前面我们了解了计算机中的CPU多任务原理,知道了进程和线程的概念。今天就通过代码看Python中多进程是如何实现的。

Linux\Unix多进程:os.fork()

Linux\Unix操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

fork()返回结果:子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

import os
pid = os.fork()
if pid == 0:
    print("这个是子进程,pid是{}, 父进程pid是{}".format(os.getpid(), os.getppid()))
else:
    print("这个是父进程,pid是{}, 创建了个子进程pid是{}".format(os.getpid(), pid))

提示:os.fork()只在Linux\Unix系统下可以使用。Mac系统是基于Unix系统开发的,所以也支持这种写法。如果你是在Windows系统下做的Python开发,那么运行上面代码,会报错:

Traceback (most recent call last):
  File "F:/python_projects/multi_process/process_thread.py", line 2, in <module>
    pid = os.fork()
AttributeError: module 'os' has no attribute 'fork'

由于Windows没有fork调用,但Python是支持跨平台的,所以Windows上用Python编写多进程的程序,就需要借用multiprocessing模块了。

跨平台多进程:multiprocessing

multiprocessing模块提供了一个Process类来代表一个进程对象。通常可导入模块:
from multiprocessing import Process

示例:

from multiprocessing import Process
from time import sleep
import os


# 定义函数
def my_run(arg):
    print("子进程启动")
    sleep(5)
    print("子进程结束")


if __name__ == "__main__":
    print("主进程已经开始启动:{}".format(os.getpid()))

    # 创建子进程
    p = Process(target=my_run, args=("子进程",))

    # 启动子进程
    p.start()

    # join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步
    p.join()

    print("父进程结束")

运行结果:

主进程已经开始启动:26308
子进程启动
子进程结束
父进程结束

提示:上面例子中join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。说白了就是,父进程的结束时间是要等子进程结束后才结束。如果不调用此方法,运行结果如下:

主进程已经开始启动:960
父进程结束
子进程启动
子进程结束

父进程的结束时间不再受子进程结束的影响。父进程可能在子进程结束之前就结束,也可能在子进程结束后才结束,具体情况要看各自进程的具体操作。本例中,如果不调用p.join()方法,但子进程有耗时操作sleep(5),而父进程中没有耗时的操作,所以父进程优先于子进程结束了。

说明:
1,创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。
2.创建子进程时,切不可这样创建:p = Process(target=my_run("子进程"))。为了方便对比,将两种写法拿出来:

# 上面例子中的创建写法
p = Process(target=my_run, args=("子进程",))

# 不推荐的创建写法
p = Process(target=my_run("子进程"))

参数的传入方式不同,上面例子中通过args=传入,形如:args=("子进程",),而错误的写法是直接通过指定target=时就一起传入,形如:target=my_run("子进程")

错误的创建形式虽然可以创建子进程成功,但会导致:join()方法无论是否调用,都会出现进程同步的现象。即:父进程的结束时间是要等子进程结束后才结束。

线程池Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程。multiprocessing模块提供了一个Pool类来代表一个进程对象。通常可导入模块:
from multiprocessing import Pool

Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

下面介绍一下multiprocessing模块下的Pool类下的几个方法:

1.apply()
函数原型:apply(func[, args=()[, kwds={}]])

该函数用于传递不定参数,同python中的apply函数一致,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不在出现)。

2.apply_async
函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])apply()用法一致,但它是非阻塞的且支持结果返回后进行回调。

3.map()
函数原型:map(func, iterable[, chunksize=None])

Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回。
注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

4.map_async()
函数原型:map_async(func, iterable[, chunksize[, callback]])
与map用法一致,但是它是非阻塞的。其有关事项见apply_async

5.close()
关闭进程池pool,使其不在接受新的任务。

6.terminal()
结束工作进程,不在处理未处理的任务。

7.join()

主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用。

下面我们看一个简单的multiprocessing.Pool类的例子:

from multiprocessing import Pool
from time import time, sleep
import random


def perform_task(id):
    begin = time()
    print("【{}号】子进程【开始】执行任务".format(id))
    sleep(random.random() * 5)
    print("【{}号】子进程【结束】执行任务".format(id))
    end = time()
    cost = end - begin
    print("【{}号】子进程执行任务耗时:{}".format(id, cost))

if __name__ == "__main__":
    pool_count = 5
    pool = Pool(pool_count)
    print("进程池准备就绪")
    print("多进程开始执行任务,等待结束...")
    for i in range(pool_count):
        pool.apply_async(perform_task, args=(i,))
    pool.close()
    pool.join()

    print("所有进程池中的任务完成")

运行结果:

进程池准备就绪
多进程开始执行任务,等待结束...
【0号】子进程【开始】执行任务
【1号】子进程【开始】执行任务
【2号】子进程【开始】执行任务
【3号】子进程【开始】执行任务
【4号】子进程【开始】执行任务
【0号】子进程【结束】执行任务
【0号】子进程执行任务耗时:0.3483567237854004
【3号】子进程【结束】执行任务
【3号】子进程执行任务耗时:1.0760245323181152
【4号】子进程【结束】执行任务
【4号】子进程执行任务耗时:1.3666496276855469
【1号】子进程【结束】执行任务
【1号】子进程执行任务耗时:3.833340644836426
【2号】子进程【结束】执行任务
【2号】子进程执行任务耗时:4.259188890457153
所有进程池中的任务完成

对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

进程间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random


# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

运行结果:

Process to write: 23192
Put A to queue...
Process to read: 2824
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

小结

本文主要学习了python中的多进程Process及实现跨平台的多进程multiprocessing,进程池Pool、通过Queue进程间通信。


更多了解,可关注公众号:人人懂编程


微信公众号:人人懂编程
上一篇下一篇

猜你喜欢

热点阅读