第十篇 并发编程2
并发:就是cpu不断的切换,看起来程序是同时程运行的
程序是由cpu来执行的,当程序遇到IO阻塞或者运行时间过长或者有一个优先级更高的程序时,操作系统就会进行cpu切换,将cpu分配到另外一个程序上运行。
在父进程中创建子进程时需要一段时间。首先要申请一块内存空间,然后将父进程的数据拷贝到此内存空间中一份,然后才能运行子进程。
一、守护进程
#什么时候用守护进程?
#首先开子进程的目的就是为了并发执行任务
#如果说该任务的执行周期与主进程的执行周期是一致的
#那么必须把该任务的进程设置为守护进程,子的守护主的。当主进程的代码运行完毕后,子进程也会结束,就像皇帝死了,太监也会跟着陪葬
from multiprocessing import Process
import os
import time
def task():
print('%s is ruuning' %os.getpid())
time.sleep(3)
print('%s is done' % os.getpid())#因为子进程在睡3秒的过程中主进程已经结束了。所以此行代码不会执行
if __name__ == '__main__':
p=Process(target=task,) #创建一个生成子进程的对象
p.daemon = True #必须在p.start()前设置守护进程,把子进程设置为父进程的守护进程
p.start() #开启子进程。就会运行task函数中的代码
time.sleep(1) #如果不睡一秒,还没来得及运行子进程,主进程就结束了,因为设置了守护进程,子进程也跟着结束了
print('主')
执行结果:
12964 is ruuning
主
总结:如果不设置为守护进程。主进程和子进程会并发执行各自的任务,主进程执行完毕了,子进程还在那继续执行任务,直到任务执行结束,如果主的执行完毕了,子进程没有什么意义了,那么在开启子进程之前一定要将子进程设置为守护进程。
#主进程代码运行完毕,守护进程就会结束
from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == '__main__':
p1=Process(target=foo)
p2=Process(target=bar)
p1.daemon=True
p1.start()
p2.start()
print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止
执行结果:
main------- #说明主进程执行完毕了,p1还没开启完
456
end456
总结:
#1:守护进程到底什么时候死?:主进程运行完最后一行代码完毕后死
#2:主进程到底什么时候算执行完毕:主进程运行完毕最后一行代码
#3:主进程什么时候应该死掉:等到所有的非守护的子进程都死掉,主才死,因为主进程要给子进程收尸
#4:主进程执行完毕了,是否意味着主进程会立马死掉? 否
#守护进程内不能再开子进程
from multiprocessing import Process
import os
import time
def foo():
print('%s is ruuning' %os.getpid())
time.sleep(3)
print('%s is done' % os.getpid())
def task():
print('%s is ruuning' %os.getpid())
time.sleep(3)
print('%s is done' % os.getpid())
p=Process(target=foo)
p.start()
if __name__ == '__main__':
p=Process(target=task,)
p.daemon = True #必须在p.start()前设置
p.start()
p.join()
print('主')
二、互斥锁(进程)
from multiprocessing import Process
import time,os
def task():
print('%s print 1' %os.getpid())
time.sleep(1)
print('%s print 2' % os.getpid())
time.sleep(1)
print('%s print 3' % os.getpid())
if __name__ == '__main__':
p1=Process(target=task)
p2=Process(target=task)
p3=Process(target=task)
p1.start()
p1.join() #join是把一个进程整个join住,等待这个进程执行完毕后再执行下一个进程
p2.start()
p2.join()
p3.start()
这样就会执行完一个进程后再执行下一个进程,就不会在终端出现打印错乱,否则3个进程并发执行,就会出现打印错乱,说明共享就意味着竞争,竞争就会出现错乱,要想把竞争带来的错乱解决就不能并发的执行。要排着队一个个的执行
模拟抢票的例子
from multiprocessing import Process,Lock
import json
import time
import random
import os
def search():
data=json.load(open('db.txt',encoding='utf-8'))
print('剩余票数是: %s' %data['count'])
def get():
data=json.load(open('db.txt',encoding='utf-8'))
if data['count'] > 0:
data['count']-=1
time.sleep(random.randint(1,3)) #模拟网络延迟
json.dump(data,open('db.txt','w',encoding='utf-8'))
print('%s 购票成功' %os.getpid())
def task(lock):
search() #查票是并发执行的
lock.acquire() #上锁
get() #抢票上了锁,得一个一个来
lock.release() #释放锁
if __name__ == '__main__':
lock=Lock() #互斥锁只能acuquire一次,要等到这个进程运行完释放锁之后才能第二进程acuquire
for i in range(10):
p=Process(target=task,args=(lock,))
p.start()
#db.txt
{"count": 1}
互斥锁:哪个程序先抢到锁,就先执行,等程序执行完毕后释放锁后,其他程序才能抢到锁,再执行。跟join的效果是一样的,可以保证一个一个的执行而不是并发执行,要想让某个功能不并发执行可以用互斥锁,与join的区别是互斥锁可以把局部变成串行。而join是把整体变成串行。
#mutex(互斥锁的英文名)一定要传给子进程
from multiprocessing import Process,Lock
import json
import time
import random
import os
def search():
data=json.load(open('db.txt',encoding='utf-8'))
print('剩余票数是: %s' %data['count'])
def get():
data=json.load(open('db.txt',encoding='utf-8'))
if data['count'] > 0:
data['count']-=1
time.sleep(random.randint(1,3)) #模拟网络延迟
json.dump(data,open('db.txt','w',encoding='utf-8'))
print('%s 购票成功' %os.getpid())
def task():
search()
lock.acquire()
get()
lock.release()
lock=Lock() #不把互斥锁传给子进程相当于造了10个对象,也就是10把锁,每个进程一把锁,就不会影响其他进程
if __name__ == '__main__':
for i in range(10):
p=Process(target=task)
p.start()
三、队列
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理
因此我们最好找寻一种解决方案能够兼顾:
1、效率高(多个进程共享一块内存的数据)
2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
from multiprocessing import Process,Queue
q=Queue(3) #指定队列的大小
q.put({'count':10}) #往队列里面放数据,什么类型的数据都可以
q.put('a')
q.put('1')
# q.put(2) #队列大小是3个。也就是只能放3个数据,超过3个,就会卡主
# q.put_nowait(2) #即使满了也不会卡住,而是会报一个异常,可以监控这个异常,然后让进程去干别的事情
print(q.get())
print(q.get())
print(q.get())
# print(q.get())
# print(q.get_nowait()) #队列里面即使没有数据,也不会卡主住。而是报一个异常,可以监控这个异常。然后让进程去干别的事情
四、生产者和消费者模型
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
from multiprocessing import Process,Queue
import time
import random
def producer(name,food,q):
for i in range(1):
res='%s%s' %(food,i)
time.sleep(random.randint(1,3))
q.put(res)
print('%s 生产了 %s' %(name,res))
def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1, 3))
print('%s 吃了 %s' %(name,res))
if __name__ == '__main__':
q=Queue() #产生一个队列
p1=Process(target=producer,args=('egon','泔水',q)) #将队列通过实参传给子进程
p2=Process(target=producer,args=('贱哥','屎',q))
c1=Process(target=consumer,args=('alex',q))
c2=Process(target=consumer,args=('alex',q))
c3=Process(target=consumer,args=('alex',q))
p1.start()
p2.start()
c1.start()
c2.start()
c3.start()
p1.join() #等生产者都生产完了再一个put一个None
p2.join()
q.put(None)
q.put(None)
q.put(None)
此种方法有弊端就是有几个消费者就要put几个None
下面这种方法可以解决此问题
from multiprocessing import Process,JoinableQueue
import time
import random
def producer(name,food,q):
for i in range(3):
res='%s%s' %(food,i)
time.sleep(random.randint(1,3))
q.put(res)
print('%s 生产了 %s' %(name,res))
def consumer(name,q):
while True:
res=q.get()
time.sleep(random.randint(1, 3))
print('%s 吃了 %s' %(name,res))
q.task_done() #表示一个数据已经从队列中取走,并处理完毕
if __name__ == '__main__':
q=JoinableQueue() #可以调join方法
p1=Process(target=producer,args=('egon','泔水',q))
p2=Process(target=producer,args=('贱哥','屎',q))
c1=Process(target=consumer,args=('alex',q))
c2=Process(target=consumer,args=('alex',q))
c3=Process(target=consumer,args=('alex',q))
c1.daemon=True #设置为守护进程
c2.daemon=True
c3.daemon=True
p1.start()
p2.start()
c1.start()
c2.start()
c3.start()
p1.join() #等待生产者生成完毕才结束进程
p2.join()
q.join() #等待队列被取干净才结束,也就是consumer函数最后一次循环运行完最后一行代码
print('主')
如何控制消费者结束,这里使用的守护进程
五、共享数据
from multiprocessing import Process,Manager,Lock #Manager可以用来创造出来一个共享内存
import time
def task(d,lock):
with lock:
temp=d['count'] #刚开始'count'=10
time.sleep(1) #睡一秒足够十个进程都开启来了,看到的初始值都是'count'=10
d['count']=temp-1 #如果不上锁,十个进程修改一个共享内存就会发生错乱,造成'count'=9,影响数据安全
if __name__ == '__main__':
lock=Lock()
m=Manager() #调用类产生一个对象
d=m.dict({"count":10}) #就可以通过对象调用类中的方法,产生一个共享的字典d
p_l=[]
for i in range(10):
p=Process(target=task,args=(d,lock))
p_l.append(p)
p.start() #开启十个子进程
for p in p_l:
p.join() #保证十个子进程都运行完毕
print(d)
执行结果:
{'count': 0}
六、回调机制
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import requests #request模块就是模拟浏览器向某个站点发起get、post等http请求
import os
import time
def get(url):
print('%s GET %s' %(os.getpid(),url))
response=requests.get(url)
if response.status_code == 200:
return {'url':url,'text':response.text}
def parse(res):
res=res.result() #取future对象的执行结果等于res,执行结果就是get函数的返回值
url=res['url']
text=res['text']
print('%s parse %s res:%s' %(os.getpid(),url,len(text)))
if __name__ == '__main__':
urls = [
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]
#使用一般方法
# p=ProcessPoolExecutor() #创造一个进程池
# start=time.time()
# l=[]
# for url in urls:
# furture=p.submit(get,url) #向进程池内提交任务,会运行get函数
# l.append(furture)
# p.shutdown(wait=True) #关闭进程池,并且任务都执行完毕
#
# for furture in l:
# parse(furture)
#
# print(time.time()-start) #3.5100128650665283
总结:此种方法的缺点是所有的爬取任务都执行完毕后才由主进程执行解析的任务,耗时比较长
#使用回调机制
p=ProcessPoolExecutor()
start=time.time()
for url in urls:
future=p.submit(get, url) #向进程池提交任务,并绑定一个future对象
future.add_done_callback(parse) #parse(futrue) #表示当有一个爬取任务执行完毕后,就会把future对象当做参数传给parse函数,实现解析功能,这个就是回调机制
p.shutdown(wait=True)
print(time.time()-start) #2.6896419525146484
print(os.getpid())
总结:此种方法就是利用回调机制,可以实现有一个爬取任务执行完毕就会通知主进程去执行解析任务,耗时短
七、开启线程两种方式
什么是线程
在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程,也就是进程执行任务时是进程中的主控制线程在执行任务,执行任务的单位也是线程。
线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程
车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线
流水线的工作需要电源,电源就相当于cpu
所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。
多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。并且造线程的速度快。
例如,北京地铁与上海地铁是不同的进程,而北京地铁里的13号线是一个线程,北京地铁所有的线路共享北京地铁所有的资源,比如所有的乘客可以被所有线路拉。
方法一
from threading import Thread
from multiprocessing import Process
import os
import time
import random
def task():
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
print('%s is done' %os.getpid())
if __name__ == '__main__':
t=Thread(target=task,) #发现开启线程速度很快,会后打印主
# t=Process(target=task,) #造进程很慢,会先打印主
t.start()
print('主')
方法二
from threading import Thread
# from multiprocessing import Process
import os
import time
import random
class Mythread(Thread):
def __init__(self,name):
super().__init__() #继承父类的init方法
self.name=name
def run(self):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
print('%s is done' %os.getpid())
if __name__ == '__main__':
t=Mythread('线程1') #产生开启线程的对象
t.start() #就会执行类中的run方法
print('主',os.getpid())
八、同一进程内的多个线程共享数据
from threading import Thread
n=100
def task():
global n
n=0
if __name__ == '__main__':
t=Thread(target=task) #开启一个线程
t.start()
t.join() #等待线程内的任务执行完毕才打印主
print('主',n)
执行结果:
主 0
发现开启线程修改线程内的变量,也会修改进程内的变量,因为他们共享一个内存空间
九、互斥锁(线程)
from threading import Thread,Lock
import time
n=100
def task():
global n
lock.acquire() #加锁后,100个线程会抢这把锁,谁先抢到锁谁先改数据,等这个线程改完后其他线程才能抢这把锁,这100线程会串行执行,改数据就不会发生错乱
temp=n
time.sleep(0.1)
n=temp-1
lock.release()
if __name__ == '__main__':
lock=Lock() #产生锁的对象
t_l=[]
for i in range(100):
t=Thread(target=task)
t_l.append(t)
t.start()
for t in t_l:
t.join()
print('主',n)
十、GIL解释器锁
Python GIL(Global Interpreter Lock)
如果多个线程的target=work,那么执行流程是
多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行
解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操作,解决这种问题没有什么高明的方法,就是加锁处理,如下图的GIL,保证python解释器同一时间只能执行一个任务的代码。

垃圾回收线程在拿到解释器锁的时候就会执行垃圾回收机制,如果拿到这个解释器锁后线程1还没有释放这个GIL锁,垃圾回收线程是不会执行的,要等到释放锁后才能执行。这样就保证线程1在执行x=100的时候,垃圾回收不会执行回收100的操作,其根本就是一把互斥锁,只不过是加载解释器级别的锁
有了GIL的存在,同一时刻同一进程中只有一个线程被执行,所以线程只能实现并发。不能实现并行,只能通过cpu的切换看起来是一起执行的,GIL解释器锁是python自动加上的,只要开一个进程,在这个进程内就会有GIL解释器锁的存在
听到这里,有的同学立马质问:进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势,也就是说python没用了,php才是最牛逼的语言?
对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用
当然对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,所以我们只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地
计算密集型实例
from multiprocessing import Process
from threading import Thread
import os,time
def work():
res=0
for i in range(100000000):
res*=i
if __name__ == '__main__':
l=[]
print(os.cpu_count()) #本机为4核
start=time.time()
for i in range(4):
p=Process(target=work) # 8.546488761901855
# p=Thread(target=work) #20.968199253082275
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))
发现计算密集型使用多进程的时间短,因为多进程会利用多核优势
IO密集型实例
from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
time.sleep(2)
if __name__ == '__main__':
l=[]
print(os.cpu_count()) #本机为4核
start=time.time()
for i in range(400):
p=Process(target=work) #34.637956619262695
# p=Thread(target=work) # 2.0041143894195557
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))
发现IO密集型的任务使用多线程时间短
GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理,如下图

比如,线程1和线程2都执行count+=1的操作,刚开始时看到的都是count=0,如果线程1抢到解释器锁,执行count+1时时间到了,被要求释放解释器锁,此时线程1就会停在当前位置,线程2抢到解释器锁,开始执行,最终执行完了count+1操作,会释放解释器锁,此时线程1再抢到解释器锁,就会接着前面停止的位置继续执行,也会执行count+1=0+1,而应该是等于2才对,就会造成数据修改的错误,此时就需要lock互斥锁,来保证用户自己数据的安全,这就是为什么加解释器锁后还要加lock互斥锁的原因
十一、守护线程
from threading import Thread
import os
import time
import random
def task():
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
print('%s is done' %os.getpid())
if __name__ == '__main__':
t=Thread(target=task,)
t.daemon=True
t.start()
print('主')
执行结果:
11204 is runing
主
对于多线程,守护线程什么时候结束呢。这个进程内所有的非守护线程都结束了,这个守护线程才算结束
举例
from threading import Thread
import time
def foo():
print(123)
time.sleep(1) #线程1任务睡1秒
print("end123")
def bar():
print(456)
time.sleep(3) #线程2任务睡3秒
print("end456")
if __name__ == '__main__':
t1=Thread(target=foo)
t2=Thread(target=bar) #开启两个线程
t1.daemon=True #线程1是守护线程,要等线程2执行完毕后才能结束
t1.start()
t2.start()
print("main-------")
执行结果:
123
456
main-------
end123
end456
十二、死锁与递归锁
死锁示例
from threading import Thread,Lock,current_thread,RLock
import time
mutexA=Lock()
mutexB=Lock()
#相当于产生两个对象,也就是两把锁
class Mythread(Thread): #继承Thread父类
def run(self):
self.f1() #产生的对象t,t.f1()就会调用类中的f1方法,调用类会产生一个对象,这个对象就会调用类中的属性和方法
self.f2() #先执行发f1方法,f1方法执行完了再执行f2方法
def f1(self):
mutexA.acquire() #首先这个10个线程先去抢A锁
print('%s 拿到A锁' %self.name) #self.name等于current_thread().getName(),就是获取当前线程的线程名字
mutexB.acquire()
print('%s 拿到B锁' %self.name)
mutexB.release()
mutexA.release() #只要一释放A锁。其他九个线程就会抢A锁
def f2(self):
mutexB.acquire()#线程1拿到B锁
print('%s 拿到B锁' % self.name) # current_thread().getName()
time.sleep(0.1) #睡0.1秒的过程中线程2抢到了A锁
mutexA.acquire() #此时线程1也要抢A锁,就抢不到了,而线程2要拿B锁也拿不到了,因为线程1还没有释放B锁
print('%s 拿到A锁' % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t=Mythread() #产生一个对象t,t=self,产生10个线程
t.start() #执行run方法
执行结果:
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
递归锁示例
递归锁可以acquire多次,当acquire计数为0时。其他线程才能抢这把锁
from threading import Thread,Lock,current_thread,RLock#RLock是递归锁
import time
mutexA=mutexB=RLock() #A和B是同一把锁,就是一把递归锁
class Mythread(Thread): #继承Thread父类
def run(self):
self.f1() #产生的对象t,t.f1()就会调用类中的f1方法,调用类会产生一个对象,这个对象就会调用类中的属性和方法
self.f2()
def f1(self):
mutexA.acquire() #线程1先拿到这个把锁,acquire1次,计数为1
print('%s 拿到A锁' %self.name) #self.name等于current_thread().getName(),就是获取当前线程的线程名字
mutexB.acquire() #acquire2次,计数为2
print('%s 拿到B锁' %self.name)
mutexB.release() #释放1次,计数为1
mutexA.release() #释放2次,计数为0,其他线程可以抢这个把锁了
def f2(self):
mutexB.acquire() #结果线程1又抢到这把锁,acquire11次。计数为1
print('%s 拿到B锁' % self.name) # current_thread().getName()
time.sleep(0.1)
mutexA.acquire() #计数为2
print('%s 拿到A锁' % self.name)
mutexA.release() #释放1次
mutexB.release() #释放两次,计数为0,其他线程可以抢锁了
if __name__ == '__main__':
for i in range(10):
t=Mythread() #产生一个对象t,t=self,产生10个线程
t.start() #执行run方法
十三、信号量和线程池
信号量可以保证同一时间有多个线程在运行,而互斥锁是保证同一时间只能有一个线程在运行
from threading import Thread,Semaphore,current_thread
import time,random
sm=Semaphore(5) #限号量相当于公共厕所,这里规定有5个坑
def task():
with sm:
print('%s 正在上厕所' %current_thread().getName())
time.sleep(random.randint(1,3))
if __name__ == '__main__':
for i in range(20):
t=Thread(target=task) #产生开启线程的对象
t.start() #开启线程,执行task方法
信号量是先造出多个线程,然后控制只能有几个同时运行,而进程池和线程池是向进程池内提交任务,进程和线程并没有造出来,只有提交到进程池和线程池内的任务才会造进程,然后执行任务
线程池
from threading import current_thread
from concurrent.futures import ThreadPoolExecutor
import time,random
def task(id):
print('%s 正在上厕所' %current_thread().getName())
time.sleep(random.randint(1,3))
if __name__ == '__main__':
t=ThreadPoolExecutor(5) #指定线程池数量,不指定,默认是cpu核心数的5倍
for i in range(20):
t.submit(task,i) #向线程池内提交任务,i是给task函数传递的实参
# t.map(task,range(20)) 此种写法为for循环的简写
t.shutdown(wait=True) #关闭线程池
十四、定时器
from threading import Timer
def hello(id):
print("hello, world",id)
t = Timer(1, hello,args=(30,)) #表示1秒后执行hello这个函数,传递的参数是30
t.start() # after 1 seconds, "hello, world" will be printed
表示定时什么时候执行这个任务,此示例是1秒后执行打印这个任务
十五、线程queue
队列
import queue
q=queue.Queue() #队列,先进先出
q.put('1')
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
执行结果:
1
2
3
堆栈
q=queue.LifoQueue() #堆栈,后进先出,last in fast out
q.put('1')
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
执行结果:
3
2
1
优先级队列
q=queue.PriorityQueue() #元组或者列表,第一个元素是优先级,数字越小,优先级越高
q.put((10,'a')) #以元组或者列表的形式去放数据
q.put((3,'b'))
q.put((-1,'b'))
print(q.get())
print(q.get())
print(q.get())
执行结果:
(-1, 'b')
(3, 'b')
(10, 'a')
十六、事件event
同进程的一样
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。

例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作。
from threading import Thread,Event,current_thread
import time,random
def conn_mysql():
count=1
while not event.is_set(): #如果event状态值不是True,就一直循环
if count > 3:
raise TimeoutError('链接超时') #手动触发一个异常
print('<%s>第%s次尝试链接' % (current_thread().getName(), count))
event.wait(1) #线程在执行时阻塞1s,然后执行下面的代码
count+=1
print('<%s>链接成功' %current_thread().getName()) #如果event状态值为True,就跳出循环打印这句话
def check_mysql():
print('\033[45m[%s]正在检查mysql\033[0m' % current_thread().getName())
time.sleep(random.randint(2,4))
event.set() #默认是False,event.set()设置event状态值为True
if __name__ == '__main__':
event=Event()
conn1=Thread(target=conn_mysql) #开启两个连接线程
conn2=Thread(target=conn_mysql)
check=Thread(target=check_mysql) #开启一个检查线程
conn1.start()
conn2.start()
check.start()
十七、协程
协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。
g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
遇到IO阻塞时会自动切换任务
from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat():
print('eat food 1')
time.sleep(2)
print('eat food 2')
def play():
print('play 1')
time.sleep(1)
print('play 2')
g1=gevent.spawn(eat) #开启协程去执行任务eat
g2=gevent.spawn(play) #开启协程去执行任务play
gevent.joinall([g1,g2]) #表示g1和g2都执行完毕才打印主
print('主')
执行结果:
eat food 1
play 1
play 2
eat food 2
主
同步和异步
from gevent import spawn,joinall,monkey;monkey.patch_all()
import time
def task(pid):
"""
Some non-deterministic task
"""
time.sleep(0.5)
print('Task %s done' % pid)
def synchronous():
for i in range(10):
task(i)
def asynchronous():
g_l=[spawn(task,i) for i in range(10)]
joinall(g_l)
if __name__ == '__main__':
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
从执行结果可以看出:同步的时候是一个任务一个任务的执行,而异步的时候10个任务几乎同时执行,加快执行效率。
上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。
协程应用:爬虫
from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time
def get_page(url):
print('GET: %s' %url)
response=requests.get(url)
if response.status_code == 200:
print('%d bytes received from %s' %(len(response.text),url))
start_time=time.time()
gevent.joinall([
gevent.spawn(get_page,'https://www.python.org/'),
gevent.spawn(get_page,'https://www.baidu.com/'),
gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
Gevent之应用举例二
通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)
服务端
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
#如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()
def server(server_ip,port):
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind((server_ip,port))
s.listen(5)
while True:
conn,addr=s.accept()
gevent.spawn(talk,conn,addr) #会没有阻塞的去执行talk函数。实现异步执行,不然服务器端会阻塞,直到客户端断开连接后另外一个客户端才能接收到数据,可以和前面的加上通讯循环与链接循环的简单套接字通讯的例子对比
def talk(conn,addr):
try:
while True:
res=conn.recv(1024)
print('client %s:%s msg: %s' %(addr[0],addr[1],res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1',8080)
客户端
# from socket import *
#
# client=socket(AF_INET,SOCK_STREAM)
# client.connect(('127.0.0.1',8080))
#
#
# while True:
# msg=input('>>: ').strip()
# if not msg:continue
#
# client.send(msg.encode('utf-8'))
# msg=client.recv(1024)
# print(msg.decode('utf-8'))
多线程并发多个客户端
from threading import Thread
from socket import *
import threading
def client(server_ip,port):
c=socket(AF_INET,SOCK_STREAM) #套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了
c.connect((server_ip,port))
count=0
while True:
c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
msg=c.recv(1024)
print(msg.decode('utf-8'))
count+=1
if __name__ == '__main__':
for i in range(5):
t=Thread(target=client,args=('127.0.0.1',8080))
t.start()
十八 IO模型
- 同步与异步针对的是函数/任务的调用方式:同步就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态。而异步情况下是当一个进程发起一个函数(任务)调用的时候,不会等函数返回,而是继续往下执行当,函数返回的时候通过状态、通知、事件等方式通知进程任务完成。
- 阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能满足的时候就将进程挂起,而非阻塞则不会阻塞当前进程
阻塞IO(blocking IO)
在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。
而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。
非阻塞IO(non-blocking IO)
Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。
也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
但是非阻塞IO模型绝不被推荐。
我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。
但是也难掩其缺点:
- 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
- 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃。
多路复用IO(IO multiplexing)
IO multiplexing这个词可能有点陌生,但是如果我说select/epoll,大概就都能明白了。有些地方也称这种IO方式为事件驱动IO(event driven IO)。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:

用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。
强调:
1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
结论: select的优势在于可以处理多个连接,不适用于单个连接
该模型的优点:
相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
该模型的缺点:
首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。
异步IO(Asynchronous I/O)
Linux下的asynchronous IO其实用得不多,从内核2.6版本才开始引入。先看一下它的流程:

受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
IO模型比较分析
到目前为止,已经将四个IO Model都介绍完了。现在回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronous IO和asynchronous IO的区别在哪。
先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。
再说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,四个IO模型可以分为两大类,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO这一类,而 asynchronous I/O后一类 。
有人可能会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。
各个IO Model的比较如图所示:

经过上面的介绍,会发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。