Python 进程学习

2018-07-10  本文已影响0人  vckah

Python 中多进程是由 multiprocessing 模块提供的

import time
from multiprocessing import Process

def test(i):
    print("子进程开始")
    print(i)
    time.sleep(2)
    print("子进程结束")

if __name__ == "__main__":
    p_ls = []
    for i in range(3):
        p  = Process(target=test, args=(i, ))
        p.start()
        p_ls.append(p)
    for i in range(3):
        p_ls[i].join(()

还有一种方法,使用类:

from multiprocessing import Process

class MyProcess(Process):
    def __init__(self):
        super().__init__()
    def run():
        pass

p = MyProcess()
p.start()
p.join()

终止一个进程:p.terminate()
判断一个进程是否存活: p.is_alive
查看一个进程的 id:p.pid
查看进程对象名称:p.name
守护进程会随着主进程的代码执行完毕而结束
p.start() 之前设置 p.daemon = True
加锁。涉及到数据改动的时候,目的是保证数据安全。

from multiprocessing import  Lock
lock = Lock()
lock.acquire()
lock.release()

进程同步控制 ---- 锁,信号量,事件

from multiprocessing import Process, Semaphore

def test(i, sem):
    sem.acquire()
    pass
    sem.release()

if __name__ =="__main__":
    sem = Semaphore(4)
    for i in range(4):
        p = Process(target=test, args=(i, sem,))
        p.start()
from multiprocessing import Event

e = Event()
e.is_set()      --> False
e.set()         --> 将事件的状态设置为 True
e.wait()        --> 依据 is_set 的值来决定是否阻塞,相当于监视 is_set 的状态,然后根据其状态来做某些事情
e.clear()       --> 将事件的状态设置为 False 阻塞

例如一个简单的红绿灯事件

import time
import random
from multiprocessing import Process, Event

def car(e, i):
        if not e.is_set():
            print('car %s 在等待'%(i) )
            e.wait()      --> 阻塞,直到得到事件状态变为 True
        print('car %s 通过'%i)

def light(e):
    while True:
        if e.is_set():        --> 开始时是 False,所以绿灯亮
            e.clear()      
            print("红灯亮了")
        else:
            e.set()            --> 设置为 True,红灯亮
            print("绿灯亮了")
        time.sleep(2)

if __name__ == "__main__":
    e = Event()
    traffic = Process(target=light, args=(e, ))
    traffic.start()
    for i in range(10):
        c = Process(target=cars, args=(e, i))
        c.start()
        time.sleep(random.randint())

进程间通信 IPC ----- 队列和管道

# 先进先出
from multiprocessing import Queue
q = Queue(5)
q.put(1)
q.get()
q.full()        -->> 队列是否满了
q.empty()    -->> 判断队列是否空
q.get_nowait()    ->> 如果队列空了,那么它会报错

消费者与生产者

import time
import random
from multiprocessing import Process, Queue

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 2))
        food = "产品%s"%i
        print('生产者生产了一个东西')
        q.put(food)

def consumer(q):
    while True:
        food = q.get()
        if food is None:
            break
        print(food)
        time.sleep(random.random())

if __name__ == "__main__":
    q = Queue()
    p = Process(target=produce, args=(q,))
    p.start()
    c = Process(target=consume, args=(q, ))
    c.start()
    p.join()
    q.put(None)

但是上面的代码有一个问题,就是 q.put(None) 的频繁,有几个消费者,那么就需要向队列中 put 几个值,这种情况可以使用 JoinableQueue

from multiprocessing import JoinableQueue
def producer(q):
    xxxx
    q.join()    -->>  阻塞,直到队列中的所有数据全被处理完毕
def consumer(q):
     xxxx
    q.task_done()
if __name__ == "__main__":
    q = JoinableQueue()
    c.daemon = True
from multiprocessing import Pipe
conn1, conn2 = Pipe()
conn1.send('12345')
conn2.recv()

进程池

主要是为了效率问题,开启单个进程会耗费很多资源。进程池就是先创建好多个进程,然后从进程池中取到进程,提高操作系统调度进程的利用率。一般 CPU 个数 + 1 是最大开启进程的数量。

from multiprocessing import Pool

def func(n):
    for i in range(10):
        n += 1
        print(n)

if __name__ == "__main__":
    pool = Pool(5)
    pool.map(func, <可迭代对象>range(100))
    # map 是自带 close 和 join 的,最后一次返回结果
    res_lst = []
# 还有一个 apply 方法
    for i in range(20):
        p.apply(func, args=(i, ))        --> 同步提交的方式
        res = p.apply_sync(func, args=(i, ))      --> 异步提交方式
        # res.get()     --> 阻塞等待结果
        # 如果需要不阻塞,那么可以这样做
        res_lst.append(res)
    # 最后可以得到结果
    for res in res_lst:
        print(res.get())
    # 与 apply_async 配合使用
    p.close()    --> 结束进程池接受任务
    p.join()     --> 感知进程池中的任务结束

进程池实现 socket 服务端

import socket
from multiprocessing import Pool

def func(conn):
    conn.send(b'hello')
    msg = conn.recv(1024).decode('utf-8')
    print(msg)
    con.close()

if __name__ == "__main__":
     p = Pool(5)
    sk = socket.socket()
    sk,bind(('127.0.0.1', 8000))
    sk.listen()
    while True:
        conn, addr = sk.accept()
        p.apply_async(func, args=(conn, ))
    sk.close()

另外进程池还有回掉函数,意思就是在 任务执行完毕后执行指定的回调函数,任务返回的数据作为回调函数的参数。回调函数的参数来源只有进程返回的数据。回调函数是在主进程中执行的

def func1():
    pass
    return x
def func2(x):
    pass
    return 

if __name__ == "__main__":
    pool = Pool(5)
    p.apply_async(func, args=(), callback=func2)

回调函数经常用于爬虫,因为网络延迟和下载很耗时。
例如

import requests
from multiprocessing import Pool

def get(url):
    res = requests.get(url)
    if res.status_code == 200:
        return url, res.content
def call_back(args):
    url, content =  args
    print(url, len(content))

if __name__ == "__main__":
    url_lst = []
    p = Pool(5)
    for url in url_lst:
        p.apply_async(get, args=(url,), callback=call_back) 
    p.close()
    p.join()
上一篇下一篇

猜你喜欢

热点阅读