进程间通信
2019-04-19 本文已影响0人
遇明不散
进程间通信
- 进程空间相对独立,资源无法相互获取,此时在不同进程间通信需要专门方法
- 进程间通信就是在不同的进程间进行数据的传输
进程间通信方式
文件
不推荐,文件和磁盘交互慢,数据不安全
管道
- 在内存中开辟一个管道空间,生成管道操作对象,对多个进程可见,多个进程使用"同一个"管道对象进行操作即可实现通信
- 管道允许在进程之间按先进先出的方式传送数据,是进程间通信的一种常见方式
管道基本函数
fd1,fd2 = Pipe(duplex = True)
# 功能:创建一个管道
# 参数:默认为True,表示双向管道;设置为False则为单向管道
# 返回值:返回两个管道流对象,分别表示管道的两端
# 如果是双向管道,都可以读写
# 如果是单向管道,则第一个对象只能接受,第二个对象只能发送,即fd1只读,fd2只写
fd.recv()
# 功能:从管道读取信息
# 返回值:读取到的内容
# 如果管道为空则阻塞
fd.send(data)
# 功能:向管道发送内容
# 参数:要发送的内容
# 可以发送python数据类型,字符串、数字、列表等多种类型的数据
# 一次recv()只能接受一次send()的内容
管道实现示例
# pipe.py
from multiprocessing import Process,Pipe
import os,time
# 双向管道,fd1,fd2均可收发
fd1,fd2 = Pipe()
# 单向管道,fd1只能接收,fd2只能发送
# fd1,fd2 = Piple(False)
def fun(name):
time.sleep(2)
fd1.send('hello %s' % name)
# 若为单向管道
# fd2.send('hello %s' % name)
# 发送其他数据类型
# fd1.send([1,2,3,4,5])
jobs = []
for i in range(5):
p = Process(target = fun,args = (i,))
jobs.append(p)
p.start()
for i in range(5):
data = fd2.recv()
# 若为单向管道
# data = fd1.recv()
print(data)
for i in jobs:
i.join()
消息队列
- 在内存中开辟一个队列数据结构模型,用来存放消息
- 任何拥有队列对象的进程都可以进行消息的存放和取出
- 取出顺序和存入顺序保持一致,即先进先出
消息队列基本函数
q = Queue(maxsize = 0)
# 功能:创建消息队列
# 参数:表示最多存放多少消息,默认表示根据内存分配存储
# 返回: 队列对象
q.put(data,[block,timeout])
# 功能:向队列存储消息
# 参数:data 要存的内容,可以使用数字、列表、字符串等
# block 默认队列满时会阻塞,设置为False则非阻塞
# timeout 超时时间
data = q.get([block,timeout])
# 功能:获取队列消息
# 参数:block 默认队列满时会阻塞,设置为False则非阻塞
# timeout 超时时间
# 返回值: 返回取出的内容
# 判断队列是否为满
q.full()
# 判断队列是否为空
q.empty()
# 判断队列中消息数量
q.qsize()
# 关闭队列
q.close()
消息队列示例
# queue1.py 基本函数使用
from multiprocessing import Queue
from time import sleep
# 创建消息队列对象
q = Queue(3)
q.put(1)
sleep(0.5)
print(q.empty()) # False
q.put(2)
print(q.full()) # False
q.put(3)
print(q.qsize()) # 3
# q.put(4,True,2) # 超时等待
print(q.get()) # 1
q.close() # 关闭队列
# queue2.py 进程间通信
from multiprocessing import Queue,Process
from time import sleep
q = Queue()
def fun1():
sleep(1)
q.put({'a':1,'b':2})
def fun2():
sleep(2)
print('Get message {} from another process'.format(q.get()))
p1 = Process(target = fun1)
p2 = Process(target = fun2)
p1.start()
p2.start()
p1.join()
p2.join()
q.close()
共享内存
- 在内存空开辟一块空间,对多个进程可见,进程可以写入输入,但是每次写入的内容会覆盖之前的内容
- 由于没有对内存进行格式化的修饰,所以存取快效率高
共享内存基本函数
obj = Value(ctype,obj)
# 功能:开辟共享内存空间
# 参数:ctype 要存储的数据类型,C语言类型
# obj 共享内存的初始化数据
# 返回:共享内存对象
obj = Array(ctype,obj)
# 功能:开辟一个共享内存空间
# 参数:ctype 要存储的数据类型,C语言类型
# obj 初始化存入的内容,比如列表,字符串,要求列表中的数据类型相同
# 如果是整数则表示开辟空间的个数
# 返回值:返回共享内存对象,可以通过遍历获取每个元素的值
# 如果存入的是字符串,obj.value 表示字符串的首地址
共享内存实现实例
# value.py
from multiprocessing import Process,Value
import time
import random
# 创建共享内存
money = Value('i',2000)
def deposite():
for i in range(100):
time.sleep(0.05)
money.value += random.randint(1,200)
def withdraw():
for i in range(100):
time.sleep(0.05)
money.value -= random.randint(1,180)
d = Process(target = deposite)
w = Process(target = withdraw)
d.start()
w.start()
d.join()
w.join()
print('Remaining:',money.value)
# array.py
from multiprocessing import Array,Process
import time
# 创建共享内存
# share_memory = Array('i',[1,2,3,4,5])
# share_memory = Array('u',['a','b','c'])
# share_memory = Array('c',b'hello')
# 创建共享内存,开辟5个整形空间
share_memory = Array('i',5)
def fun():
for i in share_memory:
print(i)
# 修改共享内存中的值
share_memory[1] = 1
p = Process(target = fun)
p.start()
p.join()
# 只有字符串的时候可以这样用
# print(share_memory.value)
管道、消息队列、共享内存比较
比较 | 管道 | 消息队列 | 共享内存 |
---|---|---|---|
开辟空间 | 内存中 | 内存中 | 内存中 |
读写方式 | 双向/单向 | 先进先出 | 覆盖之前内容 |
效率 | 一般 | 一般 | 较快 |
是否需要同步互斥 | 不需要 | 不需要 | 需要 |
应用 | 多用于父子进程 | 广泛灵活 | 需要注意进行互斥操作 |
信号
- 一个进程向另一个进程发送一个信号来传递某种讯息,接受者根据接收到的信号进行相应的行为
- 进程间通信中唯一种异步通信方法
-
kill -l
查看系统信号 -
kill -signame PID
给进程号PID
的进程发送signame
信号
同步执行与异步执行
- 同步执行:按照顺序逐句执行,一步完成再做下一步
- 异步执行:在执行过程中利用内核记录延迟发生或者准备处理的事件,这样不影响应用层的持续执行,当事件发生时再由内核告知应用层处理
常见信号
-
SIGHUP
连接断开 -
SIGINT
Ctrl-C -
SIGQUIT
Ctrl-\ -
SIGTSTP
Ctrl-Z -
SIGKILL
终止一个进程 -
SIGSTOP
暂停一个进程 -
SIGALRM
时钟信号 -
SIGCHLD
子进程状态改变时给父进程发出
信号基本函数
import signal
import os
os.kill(pid,sig)
# 功能:向指定的进程发送信号
# 参数:pid 目标进程
# sig 要发送的信号
signal.alarm(sec)
# 功能:向自身发送时钟信号SIGALRM,终止进程运行,非阻塞
# 参数:sec 时钟时间,表示多少秒后自身会收到这个信号
# 进程中只能有一个时钟,第二个会覆盖第一个时间
signal.pause()
# 功能:阻塞等待接收一个信号
信号处理函数
import signal
signal.signal(signum,handler)
# 功能:处理信号
# 参数:signum 要处理的信号
# handler 信号的处理方法
# SIG_DFL 表示使用默认的方法处理
# SIG_IGN 表示忽略这个信号
# func 传入一个函数,表示用指定函数处理
# def func(sig,frame)
# sig:捕获到的信号
# frame:信号对象
-
signal
函数也是一个异步处理信号函数 -
SIGKILL
和SIGSTOP
不能被signal
函数处理 - 僵尸进程的信号处理方案,父进程中
signal(SIGCHLD,SIG_IGN)
信号实现示例
# signal.py
import signal
import os
from time import sleep
def handler(sig,frame):
if sig == signal.SIGALRM:
print('Get a alarm signal')
elif sig == signal.SIGINT:
# print('Get a ctrl+c signal')
os.kill(os.getpid(),signal.SIGKILL)
signal.alarm(5)
# 默认信号处理方式
# signal.signal(signal.SIGALRM,signal.SIG_DFL)
# 忽略信号
# signal.signal(signal.SIGALRM,signal.SIG_IGN)
# 信号处理函数处理
signal.signal(signal.SIGALRM,handler)
signal.signal(signal.SIGINT,handler)
while True:
sleep(1)
print('Waiting for a signal ...')
信号量
给定一个数量,对多个进程可见,且多个进程都可以操作。进程通过对数量多少的判断执行各自的行为。
信号量基本函数
from multiprocessing import Semaphore
sem = Semaphore(num)
# 功能:创建信号量
# 参数:信号量初始值
# 返回:信号量对象
# 获取信号量值
sem.get_value()
# 将信号量减1,当信号量为0会阻塞
sem.acquire()
# 将信号量加1
sem.release()
信号量实现示例
# sem.py
from multiprocessing import Semaphore,Process
import os
from time import sleep
sem = Semaphore(3)
def fun():
print('进程%d等待信号量'%os.getpid())
# 消耗一个信号量
sem.acquire()
print('进程%d消耗信号量'%os.getpid())
sleep(3)
# 增加一个信号量
sem.release()
print('进程%d添加信号量'%os.getpid())
jobs = []
for i in range(4):
p = Process(target = fun)
jobs.append(p)
p.start()
for i in jobs:
i.join()
print(sem.get_value())
同步和互斥
临界资源
对多个进程或者线程都可见的资源,容易产生争夺,这类资源称为临界资源
临界区
对临界资源进行操作的代码区域称为临界区
解决资源争夺
同步或者互斥
同步
同步是一种合作关系,为完成某种任务而建立的多个进程或者线程之间的协调关系、次序等,传递消息告知资源占用情况
互斥
互斥是一种制约关系,当一个进程或线程进入临界区后会进行枷锁操作,此时其他进程(线程)无法进如临界区,只有当该进程(线程)使用后进行解锁,其他人才可以使用。这种技术往往是通过阻塞完成。
进程间同步互斥方法
事件
e.wait()
产生一种阻塞,直到e
被set
之后才结束阻塞
e.set()
将e
进行set
操作,wait
不再阻塞
e.is_set()
判断e
是否被设置的状态
e.clear()
将e
变成没有设置的状态
锁
lock = Lock()
创建锁进程
lock.acquire()
给临界区上锁
lock.release()
给临界区解锁
with lock
实现加锁解锁
具体实现上,acquire()
为一个条件阻塞函数,当有任意一个进程先进行了acquire
操作后,其他进程再企图进行acquire
操作时就会阻塞,直到lock
对象被release
后其他进程才可进行下一次acquire
操作