Python 异步IO和协程
01. 并发、并行、同步、异步、阻塞、非阻塞概念理解
并发、并行
并发:是指一个时间段内,有几个程序在同一个CPU上运行,但是任意时刻只有一个程序在CPU上运行。由于CPU的运行速度极快,可以在多个程序之间切换,这样造成一个假象就是多个程序同时在运行。
并行:是指任意时刻点上,有多个程序同时运行在多个CPU上。并行的数量与CPU的核心数一致,即CPU为4核,并行数量最多为4。
同步、异步
同步:是指代码调用IO操作时,必须等待IO操作完成才返回的调用方式。
异步:是指代码调用IO操作时,不必等待IO操作完成就返回的调用方式。
阻塞、非阻塞
阻塞:是指调用函数的时候当前线程被挂起。
非阻塞:是指调用函数的时候当前线程不会被挂起,而是立即返回。
02. IO多路复用
select,poll,epoll都是IO多路复用的机制,但三者本质上都是同步IO,因为他们都需要在读写事件就绪后自己负责读写,也就是说这个读写过程是阻塞的,而异步IO则无需自己负责读写,异步IO的实现会负责把数据从内核拷贝到用户空间。
相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件符管理多个文件描述符,将用户关系的文件描述的事件存放到内核的一个事件表中,这样在用户空间和内和空间的copy只需要一次。
epoll并不代表一定比select好
在高并发的情况下,连接活跃度不是很高,epoll比select好
在并发不高的情况下,同时连接很活跃,select比epoll好
02. 使用select完成http请求
这里跟着学习的内容写下这段代码,实际并没有理解,其中涉及到回调+事件循环+select(poll/epoll),在单线程中实现并发,待补充基础知识后,返回来再次学习。也欢迎读者进行指点。
# 使用select完成http请求
import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
selector = DefaultSelector()
urls = ["http://www.baidu.com"]
stop = False
class Fetcher:
def connected(self, key):
selector.unregister(key.fd)
self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf-8"))
selector.register(self.client.fileno(), EVENT_READ, self.readable)
def readable(self, key):
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd)
data = self.data.decode("utf-8")
# 使去掉头部信息
html_data = data.split("\r\n\r\n")[1]
print(data)
self.client.close()
urls.remove(self.spider_url)
if not urls:
global stop
stop = True
def get_url(self, url):
self.spider_url = url
url = urlparse(url)
self.host = url.netloc
self.path = url.path
self.data = b""
if self.path == "":
self.path = "/"
# 建立socket链接
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setblocking(False)
try:
self.client.connect((self.host, 80))
except BlockingIOError as e:
pass
# 注册
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
def loop():
# 事件循环,不停的请求socket的状态调用对应的回调函数
# 1. select本身是支持register模式
# 2. socket状态变化之后的回调是由程序员完成的
while not stop:
ready = selector.select()
for key, mask in ready:
call_back = key.data
call_back(key)
# 回调+事件循环+select(poll\epoll)
if __name__ == '__main__':
fetcher = Fetcher()
fetcher.get_url("http://www.baidu.com")
loop()
回调的痛楚:1. 可读性差 2. 共享状态管理难 3. 异常处理困难
03. coroutine - 协程的理解
抛出的问题:
- 回调模式编码复杂度高
- 同步编程额并发性不高
- 多线程编程需要线程间同步,使用lock机制又会降低并发性能
解决方案:
- 采用同步的方式去编写异步的代码
- 采用单线程去切换任务:
- 线程是有操作系统去切换的,单线程切换意味着需要自己去调度任务
- 不再需要lock,单线程内切换函数,性能远高于线程切换,并发性更高
这时候就需要有一个可以暂定的函数,并且在需要时还可以恢复函数继续执行,接着协程就出现了。
协程理解:有多个入口的函数、可以暂停的函数、可以暂停的函数(可以向暂停的地方传入值)
然而生成器有一个很大的特点就是可以暂停!! 可以尝试使用生成器去完成协程。
04. 生成器进阶 - send、close和throw方法
生成器不但可以产出值,还可以接收值
send()调试
def gen_func():
# 1. 可以产出值 2. 也可以接收值(调用方传递过来值)
html = yield "http://projectsedu.com"
print(html)
return "maben"
if __name__ == "__main__":
gen = gen_func()
# 在调用send发送非none值之前,我们必须启动生成器,方式有两种:1.gen.send(None),2.next(gen)
url = gen.send(None)
html = "maben"
close调试
def gen_func():
# 1. 可以产出值 2. 也可以接收值(调用方传递过来值)
try:
html = yield "http://projectsedu.com"
except BaseException:
pass
yield 2
yield 3
return "maben"
if __name__ == '__main__':
gen = gen_func()
print(next(gen))
gen.close()
print("maben")
# GenerationExit是继承自BaseException,而不是Exception
throw调试
def gen_func():
# 1. 可以产出值 2. 也可以接收值(调用方传递过来值)
try:
yield "http://projectsedu.com"
except Exception as e:
pass
yield 2
yield 3
return "maben"
if __name__ == '__main__':
gen = gen_func()
print(next(gen))
gen.throw(Exception, "download error")
print(next(gen))
gen.throw(Exception, "download error")
经过上面的调试,可以发现生成器可以进行启动、关闭、传入值、处理异常等,更够符合协程的需要。
05. 查看生成器状态
# 生成器是可以暂停的函数
import inspect
def gen_func():
yield 1
return "maben"
if __name__ == '__main__':
gen = gen_func()
print(inspect.getgeneratorstate(gen))
next(gen)
print(inspect.getgeneratorstate(gen))
try:
next(gen)
except StopIteration:
pass
print(inspect.getgeneratorstate(gen))
>>> GEN_CREATED
>>> GEN_SUSPENDED
>>> GEN_CLOSED
06. 生成器进阶 - yield from
yield from 与 yield 的区别
def g1(iterable):
yield iterable
def ge(iterable):
yield from iterable
for value in g1(range(5)):
print(value)
for value in ge(range(5)):
print(value)
>>> range(0, 5)
>>> 0
>>> 1
>>> 2
>>> 3
>>> 4
yield from 关键作用
def g1(gen):
yield from gen
def main():
g = g1()
g.send(None)
# 1.main(调用方) g1(委托生成器) gen(子生成器)
# 2.yield from 会在调用方和子生成器之间建立一个双向通道
yield from工作原理(暂时不理解,给自己挖的坑。。。)
"""
源码先省略
总结:
1. 子生成器产生的值,都是直接传递给调用方的;调用方法通过.send()发送的值都是直接传递给子生成器的;
如果发送的是None,会调用生成器的__next__()方法,如果不是None,会调用子生成器的.send()方法;
2. 子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;
3. yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数;
4. 如果调用的时候出现StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上“冒泡”;
5. 传入委托生成器的异常里,除了GeneratorEixt之外,其他的所有异常全部传递给子生成器的.throw()方法;
如果调用.throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上“冒泡”;
6. 如果在委托生成器上调用.close()或传入GenerationExit异常,会调用子生成器的.close()方法,没有的话就不用调用了。
如果在调用.close()的时候那么就想上“冒泡”,否则的话委托生成器会抛出GeneratorExit异常。
"""
07. async 和 await 原生协程
# python为了将语义变得更加明确,就引入了async和await关键词用于定义原生的协程。
async def downloader(url):
return "maben"
async def download_url(url):
# dosomething
html = await downloader(url)
return html
if __name__ == '__main__':
coro = download_url("http://www.imooc.com")
# next(None)
coro.send(None)
>>> StopIteration: maben