python 多进程
2017-02-10  本文已影响0人 
第八共同体
1.简介:
multiprocessing 是一个和threading模块相似的包,支持生成多进程。multiprocessing 包提供包括本地和远端的并发性,通过使用多进程有效避免了因GIL的限制。由此, multiprocessing包允许程序员充分利用多进程。该包可运行在Unix和Windows上。举例:
def f(x):
    return x*x
def create_pro_pool():
    # 对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
    # 由于Pool的默认大小是CPU的核数,如果你不幸拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果。
    p = Pool()
    for i in range(5):
        print(p.map(f, [1, 2, 3]))
    p.close()
    p.join()
if __name__ == '__main__':
    # create_child_pro()
    # create_child_pro_pool()
    # pros_communication()
    create_pro_pool()
结果输出:
[1, 4, 9]
[1, 4, 9]
[1, 4, 9]
[1, 4, 9]
[1, 4, 9]
在multiprocessing中, 进程的产生是通过创建一个Process类并调用他的start()方法。举个例子:
import os
import time
import random
from multiprocessing import Process, Pool, Queue
def run_proc(name):
    print 'Run child process %s (%s)...' % (name, os.getpid())
def create_child_pro():
    # 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动
    # join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
    print 'Parent process %s.' % os.getpid()
    p = Process(target=run_proc, args=('test',))
    print 'Process will start.'
    p.start()
    p.join()
    print 'Process end.'
if __name__ == '__main__':
    create_child_pro()
    # create_child_pro_pool()
    # pros_communication()
    # create_pro_pool()
运行结果:
Parent process 13032.
Process will start.
Run child process test (11900)...
Process end.
进程间通信方式(Queue, Pipe):
import os
import time
import random
from multiprocessing import Process, Pool, Queue
# 写数据进程执行的代码:
def write(q):
    for value in ['A', 'B', 'C']:
        print 'Put %s to queue...' % value
        q.put(value)
        time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
    while True:
        value = q.get(True)
        print 'Get %s from queue.' % value
def pros_communication():
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()
if __name__ == '__main__':
    # create_child_pro()
    # create_child_pro_pool()
    pros_communication()
    # create_pro_pool()
    # pros_communication_pipe()
运行结果:
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Pipe
import os
import time
import random
from multiprocessing import Process, Pool, Queue, Pipe
def f_pipe(conn):
    conn.send([42, None, 'hello'])
    conn.close()
def pros_communication_pipe():
    # The Pipe() function returns a pair of connection objects connected by a pipe which 
    # by default is duplex (two-way).
    # The two connection objects returned by Pipe() represent the two ends of the pipe.
    #  Each connection object has send() and recv() methods (among others).
    parent_conn, child_conn = Pipe()
    print parent_conn, child_conn
    p = Process(target=f_pipe, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  # prints "[42, None, 'hello']"
    p.join()
if __name__ == '__main__':
    # create_child_pro()
    # create_child_pro_pool()
    # pros_communication()
    # create_pro_pool()
    pros_communication_pipe()
运行结果:
[42, None, 'hello']
进程间锁机制:
import os
import time
import random
from multiprocessing import Process, Pool, Queue, Pipe, Lock
def f_lock(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
def pros_communication_lock():
    lock = Lock()
    for num in range(10):
        Process(target=f_lock, args=(lock, num)).start()
if __name__ == '__main__':
    # create_child_pro()
    # create_child_pro_pool()
    # pros_communication()
    # create_pro_pool()
    # pros_communication_pipe()
    pros_communication_lock()
运行结果:
('hello world', 3)
('hello world', 2)
('hello world', 1)
('hello world', 0)
('hello world', 4)
('hello world', 6)
('hello world', 5)
('hello world', 7)
('hello world', 8)
('hello world', 9)