multiprocessing 自学笔记

2019-12-27  本文已影响0人  NanaCti

multiprocessing 具有与 threading 模块相似API。

python文档 - multiprocessing

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

启动方法

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

进程通信

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send() 和 recv() 方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。

进程锁

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

共享数据

修改共享数据时需要上锁

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

pool 进程池

if __name__ == '__main__':
    # 启动4个工作进程
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # 按任意顺序打印相同的号码
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # 单个异步操作
        res = pool.apply_async(f, (20,))      # 只在一个进程中运行
        print(res.get(timeout=1))             # prints "400"

        # 单个异步操作
        res = pool.apply_async(os.getpid, ()) # 只在一个进程中运行
        print(res.get(timeout=1))             # 打印该进程的PID

        # 单个异步结合列表推导式
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]

        # 暂停10秒
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("遇到了一个多进程超时错误")

其他功能与threading一致

上一篇下一篇

猜你喜欢

热点阅读