multiprocessing 自学笔记
2019-12-27 本文已影响0人
NanaCti
multiprocessing 具有与 threading 模块相似API。
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
启动方法
- multiprocessing.set_start_method('')
设置启动方法有 1) spawn 2) fork 3) forkserver 三种选项 - get_context()
使用多种启动方式
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
进程通信
- 队列 Queue()
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
- 管道 Pipe()
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send() 和 recv() 方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。
进程锁
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
共享数据
修改共享数据时需要上锁
- Value
- Array
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
pool 进程池
if __name__ == '__main__':
# 启动4个工作进程
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# 按任意顺序打印相同的号码
for i in pool.imap_unordered(f, range(10)):
print(i)
# 单个异步操作
res = pool.apply_async(f, (20,)) # 只在一个进程中运行
print(res.get(timeout=1)) # prints "400"
# 单个异步操作
res = pool.apply_async(os.getpid, ()) # 只在一个进程中运行
print(res.get(timeout=1)) # 打印该进程的PID
# 单个异步结合列表推导式
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
# 暂停10秒
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("遇到了一个多进程超时错误")