进程间通信

2019-04-19  本文已影响0人  遇明不散

进程间通信

进程间通信方式

文件

不推荐,文件和磁盘交互慢,数据不安全

管道
管道基本函数
fd1,fd2 = Pipe(duplex = True)
# 功能:创建一个管道
# 参数:默认为True,表示双向管道;设置为False则为单向管道
# 返回值:返回两个管道流对象,分别表示管道的两端
#      如果是双向管道,都可以读写
#      如果是单向管道,则第一个对象只能接受,第二个对象只能发送,即fd1只读,fd2只写

fd.recv()
# 功能:从管道读取信息
# 返回值:读取到的内容
# 如果管道为空则阻塞

fd.send(data)
# 功能:向管道发送内容
# 参数:要发送的内容
# 可以发送python数据类型,字符串、数字、列表等多种类型的数据
# 一次recv()只能接受一次send()的内容
管道实现示例
# pipe.py
from multiprocessing import Process,Pipe
import os,time

# 双向管道,fd1,fd2均可收发
fd1,fd2 = Pipe()
# 单向管道,fd1只能接收,fd2只能发送
# fd1,fd2 = Piple(False)

def fun(name):
    time.sleep(2)
    fd1.send('hello %s' % name)

    # 若为单向管道
    # fd2.send('hello %s' % name)

    # 发送其他数据类型
    # fd1.send([1,2,3,4,5])   

jobs = []

for i in range(5):
    p = Process(target = fun,args = (i,))
    jobs.append(p)
    p.start()

for i in range(5):
    data = fd2.recv()
    # 若为单向管道
    # data = fd1.recv()
    print(data)

for i in jobs:
    i.join()
消息队列
消息队列基本函数
q = Queue(maxsize = 0)
# 功能:创建消息队列
# 参数:表示最多存放多少消息,默认表示根据内存分配存储
# 返回: 队列对象

q.put(data,[block,timeout])
# 功能:向队列存储消息
# 参数:data 要存的内容,可以使用数字、列表、字符串等
#       block 默认队列满时会阻塞,设置为False则非阻塞
#       timeout 超时时间

data = q.get([block,timeout])
# 功能:获取队列消息
# 参数:block 默认队列满时会阻塞,设置为False则非阻塞
#      timeout 超时时间
# 返回值: 返回取出的内容

# 判断队列是否为满
q.full()   

# 判断队列是否为空
q.empty()  

# 判断队列中消息数量
q.qsize()  

# 关闭队列
q.close()  
消息队列示例
# queue1.py 基本函数使用
from multiprocessing import Queue
from time import sleep

# 创建消息队列对象
q = Queue(3)

q.put(1)
sleep(0.5)
print(q.empty())  # False
q.put(2)
print(q.full())   # False
q.put(3)
print(q.qsize())  # 3
# q.put(4,True,2) # 超时等待
print(q.get())    # 1
q.close()         # 关闭队列

# queue2.py 进程间通信
from multiprocessing import Queue,Process
from time import sleep

q = Queue()

def fun1():
    sleep(1)
    q.put({'a':1,'b':2})

def fun2():
    sleep(2)
    print('Get message {} from another process'.format(q.get()))

p1 = Process(target = fun1)
p2 = Process(target = fun2)
p1.start()
p2.start()
p1.join()
p2.join()
q.close()
共享内存
共享内存基本函数
obj = Value(ctype,obj)
# 功能:开辟共享内存空间
# 参数:ctype  要存储的数据类型,C语言类型  
#      obj  共享内存的初始化数据
# 返回:共享内存对象

obj = Array(ctype,obj)
# 功能:开辟一个共享内存空间
# 参数:ctype 要存储的数据类型,C语言类型   
#      obj 初始化存入的内容,比如列表,字符串,要求列表中的数据类型相同
#          如果是整数则表示开辟空间的个数
# 返回值:返回共享内存对象,可以通过遍历获取每个元素的值
#        如果存入的是字符串,obj.value 表示字符串的首地址
共享内存实现实例
# value.py
from multiprocessing import Process,Value
import time
import random

# 创建共享内存
money = Value('i',2000)

def deposite():
    for i in range(100):
        time.sleep(0.05)
        money.value += random.randint(1,200)

def withdraw():
    for i in range(100):
        time.sleep(0.05)
        money.value -= random.randint(1,180)

d = Process(target = deposite)
w = Process(target = withdraw)
d.start()
w.start()
d.join()
w.join()

print('Remaining:',money.value)

# array.py
from multiprocessing import Array,Process
import time

# 创建共享内存
# share_memory = Array('i',[1,2,3,4,5])
# share_memory = Array('u',['a','b','c'])
# share_memory = Array('c',b'hello')

# 创建共享内存,开辟5个整形空间
share_memory = Array('i',5)

def fun():
    for i in share_memory:
        print(i)
    # 修改共享内存中的值
    share_memory[1] = 1

p = Process(target = fun)
p.start()
p.join()

# 只有字符串的时候可以这样用
# print(share_memory.value)
管道、消息队列、共享内存比较
比较 管道 消息队列 共享内存
开辟空间 内存中 内存中 内存中
读写方式 双向/单向 先进先出 覆盖之前内容
效率 一般 一般 较快
是否需要同步互斥 不需要 不需要 需要
应用 多用于父子进程 广泛灵活 需要注意进行互斥操作
信号
同步执行与异步执行
常见信号
信号基本函数
import signal
import os

os.kill(pid,sig)
# 功能:向指定的进程发送信号
# 参数:pid 目标进程
#      sig 要发送的信号

signal.alarm(sec)
# 功能:向自身发送时钟信号SIGALRM,终止进程运行,非阻塞
# 参数:sec 时钟时间,表示多少秒后自身会收到这个信号
# 进程中只能有一个时钟,第二个会覆盖第一个时间

signal.pause()
# 功能:阻塞等待接收一个信号
信号处理函数
import signal
signal.signal(signum,handler)
# 功能:处理信号
# 参数:signum  要处理的信号
#      handler 信号的处理方法 
#        SIG_DFL  表示使用默认的方法处理
#        SIG_IGN  表示忽略这个信号
#        func     传入一个函数,表示用指定函数处理
#          def func(sig,frame)
#              sig:捕获到的信号
#              frame:信号对象
信号实现示例
# signal.py
import signal
import os
from time import sleep

def handler(sig,frame):
    if sig == signal.SIGALRM:
        print('Get a alarm signal')
    elif sig == signal.SIGINT:
        # print('Get a ctrl+c signal')
        os.kill(os.getpid(),signal.SIGKILL)

signal.alarm(5)

# 默认信号处理方式
# signal.signal(signal.SIGALRM,signal.SIG_DFL)

# 忽略信号
# signal.signal(signal.SIGALRM,signal.SIG_IGN)

# 信号处理函数处理
signal.signal(signal.SIGALRM,handler)
signal.signal(signal.SIGINT,handler)

while True:
    sleep(1)
    print('Waiting for a signal ...')
信号量

给定一个数量,对多个进程可见,且多个进程都可以操作。进程通过对数量多少的判断执行各自的行为。

信号量基本函数
from multiprocessing import Semaphore

sem = Semaphore(num)
# 功能:创建信号量
# 参数:信号量初始值
# 返回:信号量对象

# 获取信号量值
sem.get_value() 

# 将信号量减1,当信号量为0会阻塞
sem.acquire() 

# 将信号量加1
sem.release() 
信号量实现示例
# sem.py
from multiprocessing import Semaphore,Process
import os
from time import sleep

sem = Semaphore(3)

def fun():
    print('进程%d等待信号量'%os.getpid())

    # 消耗一个信号量
    sem.acquire()
    print('进程%d消耗信号量'%os.getpid())

    sleep(3)

    # 增加一个信号量
    sem.release()
    print('进程%d添加信号量'%os.getpid())

jobs = []

for i in range(4):
    p = Process(target = fun)
    jobs.append(p)
    p.start()

for i in jobs:
    i.join()

print(sem.get_value())

同步和互斥

临界资源

对多个进程或者线程都可见的资源,容易产生争夺,这类资源称为临界资源

临界区

对临界资源进行操作的代码区域称为临界区

解决资源争夺

同步或者互斥

同步

同步是一种合作关系,为完成某种任务而建立的多个进程或者线程之间的协调关系、次序等,传递消息告知资源占用情况

互斥

互斥是一种制约关系,当一个进程或线程进入临界区后会进行枷锁操作,此时其他进程(线程)无法进如临界区,只有当该进程(线程)使用后进行解锁,其他人才可以使用。这种技术往往是通过阻塞完成。

进程间同步互斥方法

事件

e.wait() 产生一种阻塞,直到eset之后才结束阻塞
e.set()e进行set操作,wait不再阻塞
e.is_set() 判断e是否被设置的状态
e.clear()e变成没有设置的状态

lock = Lock() 创建锁进程
lock.acquire() 给临界区上锁
lock.release() 给临界区解锁
with lock 实现加锁解锁
具体实现上,acquire()为一个条件阻塞函数,当有任意一个进程先进行了acquire操作后,其他进程再企图进行acquire操作时就会阻塞,直到lock对象被release后其他进程才可进行下一次acquire操作

上一篇下一篇

猜你喜欢

热点阅读