2020-09-09 Python的进程间通信
进程状态和调度
image在程序运行的时候,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行,阻塞
1. 就绪状态: 当进程分配到CPU以外的所有的资源,只要获取的处理器的使用权就可立即执行,这时就是进入了就绪状态
2. 执行/运行状态: 当程序已经获取了处理器的使用权,其程序正在处理器上运行,此时的进程状态称为执行/运行状态
3. 阻塞状态: 当程序由于等待某个事件而无法执行的时候,便放弃处理器而处于阻塞状态.引起阻塞的原因可能有多种:例如:等待IO的完成,申请缓存区不能满足,等待信号等
Python中使用多进程
multiprocessing模块
multiprocessing.Process 介绍
Process模块是一个创建进程的模块,借助这个模块可以创建进程
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍:
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=(1,2,'egon',)
kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
name为子进程的名称
方法介绍
p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,
使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。
timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
属性介绍
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,
并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
p.name:进程的名称
p.pid:进程的pid
p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是
为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
window中使用Process注意事项:
在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__
==‘__main__
’ 判断保护起来,import 的时候 ,就不会递归运行了。
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/6 19:51'
from multiprocessing import Process
import time
def task(name):
print('{} 开始运行'.format(name))
time.sleep(2)
print('{} 运行结束'.format(name))
# windows 开子进程要在__name__ == '__mian__'下
# 因为开子进程会重新加载父进程的内容
if __name__ == '__main__':
p1 = Process(target=task, args=('t1',))
p2 = Process(target=task, kwargs={'name': 't2'})
p1.start()
p2.start()
print('---主进程---')
查看主进程的id和父进程的id
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/6 20:01'
import time
from multiprocessing import Process
import os
def task(name):
print('{} 开始运行'.format(name))
print('子进程id:{},父进程id:{}'.format(os.getpid(),os.getppid()))
time.sleep(2)
print('{} 运行结束'.format(name))
if __name__ == '__main__':
print('主进程id:',os.getpid())
p = Process(target=task,args=('进程1',))
p.start()
print('---主进程运行结束---')
join方法,优雅的结束子进程
可以实现父进程优雅的等待子进程的结束,join表示的是主进程等待当前的子进程结束.
并且进程之间的数据是独立的
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/6 20:09'
from multiprocessing import Process
x = 100
def change():
global x
x = 10
print('子进程修改了x,子进程结束了')
print(x)
if __name__ == '__main__':
p = Process(target=change)
p.start()
p.join() # 优雅的实现了主进程等待子进程先结束,然后再结束,避免了僵尸进程的出现.
print(x)
print('主进程结束')
结果:
使用继承Process类重写run方法的方式创建多进程
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/6 20:16'
from multiprocessing import Process
import os
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(os.getpid(),os.getppid())
print('{} 正在打游戏..'.format(self.name))
if __name__ == '__main__':
p1 = MyProcess('fioman')
p2 = MyProcess('jingjing')
p3 = MyProcess('mengmeng')
p1.start() # start会自动调用run
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print('主线程')
守护进程
父进程中将一个子进程设置为守护进程,那么这个子进程会随着主进程的结束而结束
主进程创建守护进程
1.守护进程会在主进程结束的时候而终止
2.守护进程将无法再创建子进程
守护进程的创建方式:
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/6 20:25'
from multiprocessing import Process
import os,time
class MyProcess(Process):
def __init__(self,person):
super().__init__()
self.person = person
def run(self):
print(os.getpid(),os.getppid())
time.sleep(2)
print("子进程结束")
if __name__ == '__main__':
p = MyProcess('子进程')
p.daemon = True # 一定要在p.start()之前设置,一旦设置这个属性,表示这个进程是守护进程,主进程结束的时候,它也会结束
p.start()
print('主进程结束')
运行结果,主进程结束了,子进程由于在休眠,所以后面的子进程结束并不会打印
Process中的其他方法
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/6 20:32'
from multiprocessing import Process
import time
import random
class MyProcess(Process):
def __init__(self,person):
self.name = person
super().__init__()
def run(self):
print('{} 正在打游戏'.format(self.name))
time.sleep(random.randrange(1,5))
print('{} 还在打游戏'.format(self.name))
p1 = MyProcess('Fioman')
p1.start()
p1.terminate() # 关闭进程,不会马上关闭,所以is_alive立刻查看结果可能还是存货
print(p1.is_alive()) # True
print('开始')
time.sleep(1)
print(p1.is_alive()) # False
互斥锁
通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。
当多个进程使用同一份数据资源的时候,就会因为竞争而引发数据安全或顺序混乱问题。
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 10:00'
from multiprocessing import Process
import time
import random
def task1():
print('这是 task1 任务'.center(30, '-'))
print('task1 进了洗手间')
time.sleep(random.randint(1, 3))
print('task1 办事呢...')
time.sleep(random.randint(1, 3))
print('task1 走出了洗手间')
def task2():
print('这是 task2 任务'.center(30, '-'))
print('task2 进了洗手间')
time.sleep(random.randint(1, 3))
print('task2 办事呢...')
time.sleep(random.randint(1, 3))
print('task2 走出了洗手间')
def task3():
print('这是 task3 任务'.center(30, '-'))
print('task3 进了洗手间')
time.sleep(random.randint(1, 3))
print('task3 办事呢...')
time.sleep(random.randint(1, 3))
print('task3 走出了洗手间')
if __name__ == '__main__':
p1 = Process(target=task1)
p2 = Process(target=task2)
p3 = Process(target=task3)
p1.start()
p2.start()
p3.start()
通过加锁来控制
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 10:00'
from multiprocessing import Process, Lock
import time
import random
# 生成一个互斥锁
mutex_lock = Lock()
def task1(lock):
lock.acquire() # 获取到锁对象,当其他进程想要获取锁对象的时候,就会进入等待状态
print('这是 task1 任务'.center(30, '-'))
print('task1 进了洗手间')
time.sleep(random.randint(1, 3))
print('task1 办事呢...')
time.sleep(random.randint(1, 3))
print('task1 走出了洗手间')
lock.release()
def task2(lock):
lock.acquire()
print('这是 task2 任务'.center(30, '-'))
print('task2 进了洗手间')
time.sleep(random.randint(1, 3))
print('task2 办事呢...')
time.sleep(random.randint(1, 3))
print('task2 走出了洗手间')
lock.release()
def task3(lock):
lock.acquire()
print('这是 task3 任务'.center(30, '-'))
print('task3 进了洗手间')
time.sleep(random.randint(1, 3))
print('task3 办事呢...')
time.sleep(random.randint(1, 3))
print('task3 走出了洗手间')
lock.release()
if __name__ == '__main__':
p1 = Process(target=task1, args=(mutex_lock,))
p2 = Process(target=task2, args=(mutex_lock,))
p3 = Process(target=task3, args=(mutex_lock,))
p1.start()
p2.start()
p3.start()
上面虽然通过加锁控制住了进程因为争夺资源而出现的同事对同一资源的访问,但是程序又变成串行了.这样确实浪费了时间,但是确保了程序数据的安全性.
互斥锁模拟抢票
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 10:27'
import json,time
from multiprocessing import Process,Lock
def buy_ticket(i,lock):
lock.acquire()
with open('ticket','r',encoding='utf-8') as f:
dic = json.load(f)
time.sleep(0.1)
if dic['ticket'] > 0:
dic['ticket'] -= 1
print('{} 买到了票'.format(i))
else:
print('{} 没有买到票'.format(i))
time.sleep(0.1)
with open('ticket','w') as f:
json.dump(dic,f)
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=buy_ticket,args=(i,lock))
p.start()
一点思考
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,在牺牲速度的前提下保证数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
- 效率低(共享数据基于文件,而文件是硬盘上的数据)
- 需要自己加锁处理
因此我们最好找寻一种解决方案能够兼顾:
- 效率高(多个进程共享一块内存的数据)
- 帮我们处理好锁问题。
mutiprocessing模块中为我们提供了一个IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中,队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可扩展性。
进程间的同步控制之信号量(Semaphore)
信号量是用来解决什么问题的?
信号量是用来解决,当我们的某段资源只希望有N个进程访问的时候,用的一种方式.相当于规定了这个进程,只可以拿到几个锁资源.如果超过了,限定的个数,进程就会进入等待阻塞状态,直到有的进程释放了锁,这个等待的进程才可以获取锁的使用权.信号量有效的控制了同一时间某段资源同时只能开启的进程的最大数,有效的防止了程序因为开启太多的进程,造成内存浪费的情况.
信号量的使用方式:
1. 创建信号量实例对象
2. 将对象以参数的形式传递给进程.
3. 在事件函数中,开始的时候获取锁,结束的时候,释放锁.当它的锁释放的时候,新的等待的进程就会获取到锁.
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 11:32'
from multiprocessing import Process, Semaphore
import time
import random
def task(name, sem):
sem.acquire()
print('{} 正在执行任务'.format(name))
time.sleep(random.randrange(1, 3))
print('{} 任务执行完毕'.format(name))
sem.release()
if __name__ == '__main__':
sem = Semaphore()
for i in range(20):
p = Process(target=task, args=(i, sem))
p.start()
进程间控制之事件
事件的原理就是: 在主进程中创建一个事件对象Event(),然后用参数的形式传递给子进程.在子进程中通过一个进程的事件状态改变,来决定另一个进程是阻塞状态还是继续执行.
比如:
车和红绿灯事件. 红灯的时候,将事件的is_set()设置为False,默认就是False.当是绿灯的时候将其设置为True,然后车就可以根据事件的状态来决定是否可以通行.类似于通知的形式,用一个共同的标志位,来通过改变标志位,通知相应的进程是运行还是阻塞.
事件常用的方法
obj.is_set():默认值为False,事件是通过此方法的bool值去标示wait()的阻塞状态
obj.set():将is_set()的bool值改为True
obj.clear():将is_set()的bool值改为False
obj.wait():is_set()的值为False时阻塞,否则不阻塞
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 12:21'
import time,random
from multiprocessing import Process,Event
def car(e,i):
if not e.is_set():
print('{} 号车在红灯处等待'.format(i))
e.wait() # 这里会阻塞,直到得到一个 事件状态为True的信号事件.
print('{} 号车通过'.format(i))
def light(e):
while True:
if e.is_set():
e.clear() # 两秒之后,变成红灯
print('红灯亮了')
else:
e.set() # 两秒之后,变成绿灯
print('绿灯亮了')
time.sleep(1)
if __name__ == '__main__':
e = Event()
lig = Process(target=light,args=(e,))
lig.start()
for i in range(10):
c = Process(target=car,args=(e,i))
c.start()
time.sleep(random.random())
进程间通信(队列和管道multiprocessing.Queue和multiprocess.Pipe)
进程间通信之队列 MultiProcessing.Queue
进程间通信
IPC Inter-Process Communication
队列
创建一个进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递.
Queue 队列常用的方法
Queue([maxsize])
创建共享的进程队列。
参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁定实现。
Queue([maxsize])
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,
默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,
用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。
如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。
timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,
队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
q.empty()
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。
也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
q.full()
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
其他方法
q.close()
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,
但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的
数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
q.cancel_join_thread()
不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
q.join_thread()
连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,
此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
队列常用的方法举例:
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 13:12'
from multiprocessing import Queue
q = Queue(3)
# put get put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
# q.put(4,block=False) # 如果队列已经满了,程序会被阻塞到这里
# 如果这里设置非阻塞,但是当队列已经满的时候会报错
try:
q.put_nowait(3) # 这里相当于是强制不阻塞的往队列中塞东西,当塞不进去的时候就会报错
except Exception as e:
print('队列已经满了.')
# 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不要再放了
print(q.full()) # True
print(q.get())
print(q.get())
print(q.get())
print(q.get()) # 同put方法一样,如果队列为空的时候,会阻塞
try:
q.get_nowait()
except Exception as e:
print('队列已经空了')
print(q.empty())
使用队列进行进程间通信的简单示例:
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 13:21'
from multiprocessing import Process, Queue
import time
def task(q):
q.put([time.ctime(), 'from Eva', 'hello'])
if __name__ == '__main__':
q = Queue() # 创建一个没有大小限制的队列对象
p = Process(target=task, args=(q,))
p.start()
print(q.get())
p.join()
再看看一个较为复杂的例子
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 13:27'
from multiprocessing import Process, Queue
import time
import os
def inputQ(q):
info = str(str(os.getpid()) + 'put:' + str(time.ctime()))
q.put(info)
def outputQ(q):
info = q.get()
print('获取到的存放的数据是: {}'.format(info))
if __name__ == '__main__':
q = Queue(3)
record1 = [] # 记录存放数据的进程
record2 = [] # 记录取出数据的进程
# 往队列中存放数据的进程
for i in range(10):
process = Process(target=inputQ, args=(q,))
process.start()
record1.append(process)
# 从队列中取数据的进程
for i in range(10):
process = Process(target=outputQ, args=(q,))
process.start()
record2.append(process)
# 让主进程等待它的结束,使用join
for i in record1:
i.join()
for i in record2:
i.join()
生产者消费模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用该模型?
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模型?
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
基于队列实现生产者消费模型
生产者消费模型,完美的解决了生产者和消费者供需不平衡的问题.
当生产速度比较快的时候,可以用增加消费者进程数量的方法平衡它们之间的生产和消费关系.
当消费速度比较快的时候,可以用增加生产者进程数量的方法平衡它们之间的生产和消费关系.
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 14:06'
from multiprocessing import Process,Queue
import time,random
def consumer(q,name):
while True:
food = q.get()
if food is None:
print('接收到了一个空,生产者已经完事了')
break
print('\033[31m{}消费了{}\033[0m'.format(name,food))
time.sleep(random.random())
def producer(name,food,q):
for i in range(10):
time.sleep(random.random())
f = '{}生产了{}{}'.format(name,food,i)
print(f)
q.put(f)
if __name__ == '__main__':
q = Queue(20)
p1 = Process(target=producer,args=('fioman','包子',q))
p2 = Process(target=producer,args=('jingjing','馒头',q))
p1.start()
p2.start()
c1 = Process(target=consumer,args=(q,'mengmeng'))
c2 = Process(target=consumer,args=(q,'xiaoxiao'))
c1.start()
c2.start()
# 让主程序可以等待子进程的结束.
p1.join()
p2.join()
# 生产者的进程结束,这里需要放置两个空值,供消费者获取,用来判断已经没有存货了
q.put(None)
q.put(None)
print('主程序结束..........')
使用队列写生产者消费者模型的时候,注意事项是当生产者生产完毕的时候,一定要通知消费者,通过放一个特殊的值的方式.如果有多个消费者,就要放置多个标志结束的值.这样消费者的进程,才能知道什么时候结束.这种属于边生产边消费的模型,不能使用is_empty()来判断数据队列是否为空,因为消费者在获取数据的时候,有可能生产者正在生产中.所以不能用这种方式来决定消费者的进程是否结束,必须通过一个特殊的标识值来通知消费者,生产者已经没有数据了.你可以结束消费进程了.已经没有东西可以给你消费了.
注意: 使用队列的时候一个缺点,如果有N个消费者,就要放置N个值去通知消费者,有点费劲.
这里我们引入一个JoinableQueue()
创建可连接的共享进程队列,它们也是队列,但是这些队列比较特殊.它们可以允许消费者通知生产者项目已经被成功处理.注意,这里必须是生产者生产完了,生产者的进程被挂起,等到消费者完全消费的时候,生产者进程就结束,然后主程序结束.将消费者进程设置为守护进程,这样的话,主进程结束的时候,消费进程也就结束了.
JoinableQueue()比普通的Queue()多了两个方法:
q.task_done()
使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。
q.join()
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。
# encoding:utf-8
__author__ = 'Fioman'
__time__ = '2019/3/7 14:06'
from multiprocessing import Process,JoinableQueue
import time,random
def consumer(q,name):
while True:
food = q.get()
if food is None:
print('接收到了一个空,生产者已经完事了')
break
print('\033[31m{}消费了{}\033[0m'.format(name,food))
time.sleep(random.random())
q.task_done() # 向生产者发送信号,表示消费了一个
def producer(name,food,q):
for i in range(10):
time.sleep(random.random())
f = '{}生产了{}{}'.format(name,food,i)
print(f)
q.put(f)
q.join() # 当生产者生产完毕之后,会在这里阻塞.等待消费者的消费.
if __name__ == '__main__':
q = JoinableQueue(20)
p1 = Process(target=producer,args=('fioman','包子',q))
p2 = Process(target=producer,args=('jingjing','馒头',q))
p1.start()
p2.start()
c1 = Process(target=consumer,args=(q,'mengmeng'))
c2 = Process(target=consumer,args=(q,'xiaoxiao'))
c1.daemon = True # 将消费者设置为守护进程
c2.daemon = True # 将消费者设置为守护进程
c1.start()
c2.start()
# 让主程序可以等待生产子进程的结束.
p1.join()
p2.join()
print('主程序结束..........')