python并发之二:一篇文章搞懂python多进程(理论+实践
python多进程
在上一篇博文中,我详细的讲解了python的多线程。在这篇博文中,我将对python的多进程部分进行细致讲解。
前言
在上一篇博文中我们提到,由于python多线程无法调度到处理器多个核心上并行执行,所以它更适用于IO密集型程序。而对于计算密集型的程序,就是python多进程大展身手的场合了。
由于windows系统和linux系统在进程实现的方式上相差很多,这里笔者不进行详细阐述,感兴趣的读者可以自行查阅。在这篇博文中,笔者所有的代码将全部运行在windows系统中。针对程序在linux平台上的不同实现的问题,读者可以给笔者留言。
进程的创建
python的多进程相关模块主要封装在multiprocessing库中,windows系统中父进程新建并启动一个子进程的代码及运行结果如下:
# windows版创建子进程的代码
from multiprocessing import Process
import os
# 子进程要执行的代码
def run_proc(name):
print('子进程的进程号 %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('父进程 %s 开始运行' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('子进程开始运行')
# 启动子进程
p.start()
# 等待子进程结束
p.join()
print('父进程结束')
运行结果为
父进程 422356 开始运行
子进程开始运行
子进程的进程号 test (453796)...
父进程结束
我们可以从运行结果中看到由进程号为422356的父进程创建进程号为453796的子进程并启动。程序中的start()函数和join()函数和多线程中的相应函数作用完全相同,分别表示启动进程和等待该进程运行结束,详细内容可以参考笔者的上一篇博客
利用进程池创建进程
在需要创建大量进程的场合,上文提到的方式就有些力不从心了。所以我们可以选择进程池的方式来创建大量进程。程序以及输出如下:
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('运行任务 %s (%s)...' % (name, os.getpid()))
start = time.time()
# 每个进程在[0, 3]秒之间随机睡眠一段时间
time.sleep(random.random() * 3)
end = time.time()
print('任务 %s 运行时间 %0.2f 秒.' % (name, (end - start)))
if __name__=='__main__':
print('父进程 %s.' % os.getpid())
# pool默认为CPU核的数量,可以指定
p = Pool()
for i in range(6):
p.apply_async(long_time_task, args=(i,))
print('等待所有子进程结束...')
p.close()
p.join()
print('所有子进程已运行结束.')
输出结果为:
父进程 393580.
等待所有子进程结束...
运行任务 0 (461268)...
运行任务 1 (355412)...
运行任务 2 (466252)...
运行任务 3 (454400)...
运行任务 4 (431068)...
运行任务 5 (452984)...
任务 0 运行时间 0.86 秒.
任务 2 运行时间 1.34 秒.
任务 5 运行时间 1.41 秒.
任务 3 运行时间 2.10 秒.
任务 1 运行时间 2.58 秒.
任务 4 运行时间 2.67 秒.
所有子进程已运行结束.
结果是显而易见的,6个进程同时启动,并在随机运行不同的时间后,依次结束运行,值得一提的是我在这里利用进程池启动了6个进程,而这个进程池实际最多能同时启动8个进程,8也是我的处理器核心数量,所以你如果在创建进程池时不给Pool()传入参数时,默认为你的处理器核心数量。
多进程运行计算密集型程序
口说无凭,笔者在这一部分展示一个多进程运行计算密集型程序的实例,笔者使用的是8核处理器,而在本程序中,我启动了8个进程分别运行计算密集型程序。程序以及利用windows的资源管理器观察CPU利用率如下
from multiprocessing import Pool
import os, time
def cal_intensity_task(name):
print('运行任务 %s (%s)...' % (name, os.getpid()))
start = time.time()
# 每个进程在[0, 3]秒之间随机睡眠一段时间
i = 1
while(True):
i = i ** 2
end = time.time()
print('任务 %s 运行时间 %0.2f 秒.' % (name, (end - start)))
if __name__=='__main__':
print('父进程 %s.' % os.getpid())
# pool默认为CPU核的数量,可以指定
p = Pool()
for i in range(8):
p.apply_async(cal_intensity_task, args=(i,))
print('等待所有子进程结束...')
p.close()
p.join()
print('所有子进程已运行结束.')

我们可以看到,此时8个进程正在充分“压榨”CPU,而这一场景是大家在python多线程场合下无法看到的。python多进程的重要性也就显而易见了
多个进程之间的消息传递
说到并发程序,就不得不说同步,这一部分笔者将讲解一个消息传递的实例
from multiprocessing import Process, Queue
import time, random
# 写数据进程执行的代码:
def write(q):
for value in ['A', 'B', 'C']:
print('把值 %s 加入消息队列' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
while True:
value = q.get()
print('从消息队列中获取值 %s ' % value)
if q.empty():
break
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
pw.join()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pr.join()
运行结果如下:
把值 A 加入消息队列
把值 B 加入消息队列
把值 C 加入消息队列
从消息队列中获取值 A
从消息队列中获取值 B
从消息队列中获取值 C
这里使用到了一个在多进程情况下安全的数据容器——消息队列,它可以保证两个进程对于队列中的元素的存取不会彼此干扰,因此可以保证程序的正确运行。
总结
以上就是笔者总结的python多进程的知识,结合上一篇python多线程的部分,希望读者也能对python的多线程与多进程有一定了解,并且读者可以与java多线程相比较,可以更容易理解相关的技术实现。