PythonPython学习

Python并发编程——多进程

2021-05-16  本文已影响0人  xiaogp

摘要:Python多进程

进程是操作系统中具体的处理任务,米一个进程都会有自己独立的内存空间,它是线程的载体。当一个程序启动时,会默认启动一个进程,将该进程装载到内存,同时在该进程中还会默认启动一个线程,来执行本进程中的内容。


创建多进程程序

在Python中进程的相关实现被封装在模块multiprocessing中,进程的创建方法和线程一样有两种

类似于线程创建,对Process类实例化时直接将进程的处理函数传入即可,通过target来接收处理函数,创建好进程后通过实例化对象的start方法将其启动

(1)使用类实例化方法创建进程

定义进程处理函数 run_proc 。在实例化 Process 时 ,将 run_proc 传入 。接着使用实例化对象
start 方法启动进程,然后调用join代表该子进程结束之后主进程才能结束

import time
import os
from multiprocessing import Process


def run_proc(sid):
    print("{} start, pid:{}".format(sid, os.getpid()))
    time.sleep(3)
    print("{} end, pid:{}".format(sid, os.getpid()))


if __name__ == '__main__':
    p1 = Process(target=run_proc, args=("a", ))
    p2 = Process(target=run_proc, args=("b", ))
    print("main process: {}".format(os.getpid()))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("main process end...")

输出如下,课件主进程和子进程的进程号pid都不一样,在调用join之后主进程阻塞直到所有子进程结束

main process: 12611
a start, pid:12612
b start, pid:12613
a end, pid:12612
b end, pid:12613
main process end...
(2)使用继承类的方式创建进程

实现自定义进程类使其继承于系统进程类 Process 。重写类中的 run 方法,

import time
import os
from multiprocessing import Process


def run_proc(sid):
    print("{} start, pid:{}".format(sid, os.getpid()))
    time.sleep(3)
    print("{} end, pid:{}".format(sid, os.getpid()))


class MyProcess(Process):
    def __init__(self, sid):
        super().__init__()
        self.sid = sid

    def run(self):
        run_proc(self.sid)


if __name__ == '__main__':
    p1 = Process(target=run_proc, args=("a",))
    p2 = Process(target=run_proc, args=("b",))
    print("main process: {}".format(os.getpid()))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("main process end...")

输出如下,和直接实例化Process类输出效果一致

main process: 14859
a start, pid:14860
b start, pid:14861
a end, pid:14860
b end, pid:14861
main process end...

创建进程池

当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候就要编写自己的线程池/进程池,在Python只有两个模块实现进程池,分别是concurrent.futuresmultiprocessing.Pool

(1)multiprocessing.Pool

传递给Pool一个参数设置进程池内的最大进程数,Pool有多个方法,主要是applyapply_asyncmapmap_asyncstarmapstarmap_async,区别如下

除此之外,在执行完毕多进程任务后,调用close关闭Process对象,释放与之关联的所有资源,调用join阻塞主进程,直到调用join的子进程执行完毕再退出

下面用代码测试说明

  1. apply
import time
import datetime
import os
from multiprocessing import Pool


def run_proc(sid):
    time.sleep(3)
    print("{} end, pid:{}, time:{}".format(sid, os.getpid(), datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")))
    return str(sid)


if __name__ == '__main__':
    pool = Pool(4)
    jobs = ["a", "b", "c", "d"]
    for i in jobs:
        pool.apply(run_proc, i)
    pool.close()
    pool.join()

输出如下,可见apply每次只输入一个执行函数的参数,即每次执行一次,多个参数组成的任务列表只能一个接着一个执行

a end, pid:25260, time:2021-05-16 18:15:08
b end, pid:25261, time:2021-05-16 18:15:11
c end, pid:25262, time:2021-05-16 18:15:14
d end, pid:25263, time:2021-05-16 18:15:17

2.apply_async
直接修改Pool的apply方法改为apply_assync

if __name__ == '__main__':
    pool = Pool(4)
    jobs = ["a", "b", "c", "d"]
    for i in jobs:
        pool.apply_async(run_proc, i)
    print("main process")
    pool.close()
    pool.join()

输出如下,可见达到了并行效果,进程之间执行任务互补阻塞,主进程也没有被阻塞,main process被提前打印了出来

main process
a end, pid:25721, time:2021-05-16 18:17:33
b end, pid:25722, time:2021-05-16 18:17:33
c end, pid:25723, time:2021-05-16 18:17:33
d end, pid:25724, time:2021-05-16 18:17:33

3.map
修改Pool的方法为map

if __name__ == '__main__':
    pool = Pool(4)
    jobs = ["a", "b", "c", "d"]
    pool.map(run_proc, jobs)
    print("main process")
    pool.close()
    pool.join()

输出如下,可见map达到了并行的效果,但是主进程被阻塞,即map执行完任务列表后,主进程才执行

a end, pid:26939, time:2021-05-16 18:21:43d end, pid:26942, time:2021-05-16 18:21:43

b end, pid:26940, time:2021-05-16 18:21:43
c end, pid:26941, time:2021-05-16 18:21:43
main process

4.map_async

if __name__ == '__main__':
    pool = Pool(4)
    jobs = ["a", "b", "c", "d"]
    res = pool.map_async(run_proc, jobs)
    print("main process...")
    pool.close()
    pool.join()
    print(res.get())

输出如下,可见效果了map类似,区别是主进程没有被阻塞,main process被提前打印了出来

main process...
c end, pid:21401, time:2021-05-16 21:41:54
b end, pid:21400, time:2021-05-16 21:41:54a end, pid:21399, time:2021-05-16 21:41:54

d end, pid:21402, time:2021-05-16 21:41:54
['a', 'b', 'c', 'd']

5.starmap
将执行函数的入参改为两个,Pool函数改为starmap,并且传入的任务列表的每个参数也改为两个

def run_proc(sid, other):
    time.sleep(3)
    print("{} end, other:{}, pid:{}, time:{}".format(sid, other, os.getpid(), datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")))
    return str(sid) + str(other)


if __name__ == '__main__':
    pool = Pool(4)
    jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
    pool.starmap(run_proc, jobs)
    print("main process")
    pool.close()
    pool.join()

输入如下,执行函数正确的接收了两个参数,主进程阻塞

b end, other:r, pid:27642, time:2021-05-16 18:24:52
a end, other:b, pid:27641, time:2021-05-16 18:24:52
c end, other:d, pid:27643, time:2021-05-16 18:24:52
d end, other:a, pid:27644, time:2021-05-16 18:24:52
main process

看一个map是否支持两个参数

if __name__ == '__main__':
    pool = Pool(4)
    jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
    pool.map(run_proc, jobs)
    print("main process")
    pool.close()
    pool.join()

直接报错,执行函数缺少参数

TypeError: run_proc() missing 1 required positional argument: 'other'

6.starmap_async

if __name__ == '__main__':
    pool = Pool(4)
    jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
    res = pool.starmap_async(run_proc, jobs)
    print("main process")
    pool.close()
    pool.join()
    print(type(res.get()))

输出如下,可见和starmap相比,主进程没有被阻塞,其他一样

main process
a end, other:b, pid:17123, time:2021-05-16 21:28:30
b end, other:r, pid:17124, time:2021-05-16 21:28:30
d end, other:a, pid:17126, time:2021-05-16 21:28:30
c end, other:d, pid:17125, time:2021-05-16 21:28:30
<class 'list'>

多线程和多进程的区别

多进程和多线程都可以用并行机制来提升系统的运行效率,二者的区别在于运行时所占的内存分布不同

在大型计算机集群系统中,都会将多进程程序分布运行在不同的计算机上协同工作,而每一台机器上的进程内部,又会由多个线程来并行工作

上一篇下一篇

猜你喜欢

热点阅读