04、异步与非阻塞
项目GIthub源码地址:[https://github.com/vannesspeng/TornadoForum]
1、tornado提供了哪些功能
Tornado是一个Python web框架和异步网络库,它最初基于FriendFeed库之上开发。通过使用非阻塞的网络IO,Tornado可以扩展到数万个开放连接,使其成为长轮询, WebSockets和其他需要与每个用户建立长期连接的应用程序的理想选择 。Tornado Web框架和HTTP服务器一起提供了WSGI的全栈替代方案
Tornado大致可分为四个主要部分:
- Web框架(包括
RequestHandler
子类,用于创建Web应用程序和各种支持类)。 - HTTP(
HTTPServer
和AsyncHTTPClient
)的客户端和服务器端实现。 - 一个异步网络库,包括类
IOLoop
和IOStream
,它们作为HTTP组件的构建块,也可用于实现其他协议。 - 一个协程库(
tornado.gen
),它允许以比链接回调更直接的方式编写异步代码。这类似于Python 3.5 中引入的本机协同程序功能。如果可用,建议使用本机协程代替模块。async def
tornado.gen
2、同步、异步、阻塞和非阻塞的概念
一些事实
1、cpu的速度远高于io速度
2、io包括文件访问和本地文件访问,比如requests、urllibs等传统的网络库都是同步的io
3、网络io大部分的的时间都是处于等待状态,在等待的时候cpu是空闲的,但是又不能执行其他操作。
阻塞与非阻塞
阻塞是指调用函数时,当前线程被挂起。
非阻塞是指调用函数时,当前线程不会被挂起,而是立即返回。
同步与非异步
同步:所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回。(比如你叫别人做饭给你吃,别人去做饭了,你就一直在餐桌边等待饭做好)
异步:异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。(比如你通知别人做饭,如果别人同意会立即返回1,你等待饭做好的这段时间,可以去做其他的事情,比如看电视、玩游戏,做好之后,别人总会有通知你的方式;如果别人拒绝给你做饭,她也会立刻通知你,返回-1,这个时候也不会产生阻塞;还有一种情况,是别人答应你做饭,结果过一会儿,又反悔说不做了,但也会通知你。但是最后吃饭,你需要自己先去获取通知信息,才能去端饭吃)
同步和异步关注的是获取结果的方式。同步是获取到结果之后才进行下一步操作,阻塞非阻塞关注的是调用接口时当前线程的状态,同步可以调用阻塞也可以调用非阻塞。异步是调用非阻塞接口。
3、socket的非阻塞io请求html
这里首先介绍传统的通过request库来请求html
import requests
html = requests.get("http://www.baidu.com").text
# 需要等待Tcp协议的三次握手来建立连接后,才能获取html文本,属于阻塞式的IO
print(html)
接下来使用socket的方式来实现
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = "www.baidu.com"
client.connect((host, 80)) # 同样也属于阻塞IO方式
client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format("/", host).encode("utf-8"))
data = b""
while 1:
d = client.recv(1024)
if d:
data += d
else:
break
data = data.decode("utf-8")
print(data)
那么如何实现非阻塞式的IO呢,方法如下:
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# author:pyy
# datetime:2018/12/18 15:58
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False) # 设置client为非阻塞式IO的模式
host = "www.baidu.com"
try:
client.connect((host, 80)) # 阻塞IO
except BlockingIOError as e:
# 做一些其他的事情,提供CPU使用率,比如爬虫程序可以爬取另外一个网页的数据
pass
while 1:
try:
client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format("/", host).encode("utf-8"))
print("send success")
break
except OSError as e:
pass
data = b""
while 1:
try:
d = client.recv(1024)
except BlockingIOError as e:
continue
if d:
data += d
else:
break
data = data.decode("utf-8")
print(data)
这种方式虽然实现了非阻塞式的IO请求html,但是还是使用while循环去等待某种状态就绪,其实还是存在阻塞的嫌疑,那有没有方法可以实现,当方法执行状态成功时,可以主动通知调用者呢,比如当client连接远程主机成功时,我们马上可以知道连接成功的状态,答案是有的,那就是我们下一小节要介绍的select、poll和epoll。
select、poll和epoll
selct,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符。一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上就是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。
select
select函数监视的文件描述符分为三类:writefds, readfds,exceptfds。调用select后,select函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回,设为null即可)函数返回,这个时候程序就会系统调用,将数据从kernel复制到进程缓冲区。。当select函数返回后,可以遍历fdset来找到就绪的描述符。
select目前几乎在所有的平台上支持,其良好夸平台支持也是它的一个优点。
select的缺点
1、根据fd_size的定义,它的大小为32个整数大小(32位机器为32*32,所有共有1024bits可以记录fd),每个fd一个bit,所以最大只能同时处理1024个fd。
2、每一次呼叫 select( ) 都需要先从 user space把 FD_SET复制到 kernel(约线性时间成本)
为什么 select 不能像epoll一样,只做一次复制就好呢?
每一次呼叫 select()前,FD_SET都可能更动,而 epoll 提供了共享记忆存储结构,所以不需要有 kernel 與 user之间的数据沟通。
3、每次要判断【有哪些event发生】这件事的成本很高,也就是遍历fd_set,因为select(polling也是)采取主动轮询机制.
poll
poll的原理与select非常相似,差别如下:
- 描述fd集合的方式不同,poll使用 pollfd 结构而不是select结构fd_set结构,所以poll是链式的,没有最大连接数的限制
- poll有一个特点是水平触发,也就是通知程序fd就绪后,这次没有被处理,那么下次poll的时候会再次通知同个fd已经就绪。
epoll
epoll 提供了三个函数:
1、int epoll_create(int size);
建立一個 epoll 对象,并传回它的id
2、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
事件注册函数,将需要监听的事件和需要监听的fd交给epoll对象
3、int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
等待注册的事件被触发或者timeout发生
epoll解决的问题:
-
epoll没有fd数量限制
epoll没有这个限制,我们知道每个epoll监听一个fd,所以最大数量与能打开的fd数量有关,一个g的内存的机器上,能打开10万个左右 -
epoll不需要每次都从user space 将fd set复制到内核kernel
epoll在用epoll_ctl函数进行事件注册的时候,已经将fd复制到内核中,所以不需要每次都重新复制一次 -
select 和 poll 都是主動輪詢機制,需要拜訪每一個 FD;epoll是被动触发方式,给fd注册了相应事件的时候,我们为每一个fd指定了一个回调函数,当数据准备好之后,就会把就绪的fd加入一个就绪的队列中,epoll_wait的工作方式实际上就是在这个就绪队列中查看有没有就绪的fd,如果有,就唤醒就绪队列上的等待者,然后调用回调函数。
-
虽然epoll。poll。epoll都需要查看是否有fd就绪,但是epoll之所以是被动触发,就在于它只要去查找就绪队列中有没有fd,就绪的fd是主动加到队列中,epoll不需要一个个轮询确认。
换一句话讲,就是select和poll只能通知有fd已经就绪了,但不能知道究竟是哪个fd就绪,所以select和poll就要去主动轮询一遍找到就绪的fd。而epoll则是不但可以知道有fd可以就绪,而且还具体可以知道就绪fd的编号,所以直接找到就可以,不用轮询。
自己去实现事件循环
如下先看一个select实例
import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
selector = DefaultSelector()
class Fetcher:
def connected(self, key):
selector.unregister(key.fd)
self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format("/", self.host).encode("utf8"))
selector.register(self.client.fileno(), EVENT_READ, self.readble)
def readble(self, key):
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd)
data = self.data.decode("utf8")
print(data)
def get_url(self, url):
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setblocking(False)
self.data = b""
self.host = "www.baidu.com"
try:
self.client.connect((self.host, 80)) # 阻塞io, 意味着这个时候cpu是空闲的
except BlockingIOError as e:
# 做一些其他事
pass
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
def loop_forever():
#事件循环
while 1:
ready = selector.select()
for key, mask in ready:
call_back = key.data
call_back(key)
if __name__ == "__main__":
fetcher = Fetcher()
url = "http://www.baidu.com"
fetcher.get_url(url)
loop_forever()
以上select实例是基于方法回调模式的实现,它一下几个缺点:
1、回调过深,造成代码维护非常困难。
2、栈撕裂,造成异常无法抛出(比如connected、readble方法出现异常是无法抛出的)
要解决以上问题,就需要利用下一小节中介绍的协程。
协程
协程,又称微线程,纤程。英文名Coroutine。
协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。
子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。
所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。
子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。
协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:
def A():
print '1'
print '2'
print '3'
def B():
print 'x'
print 'y'
print 'z'
假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:
1
2
x
y
3
z
但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。
看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有何优势?
最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
Python对协程的支持还非常有限,用在generator中的yield可以一定程度上实现协程。虽然支持不完全,但已经可以发挥相当大的威力了。
来看例子:
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:
import time
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
c.next()
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
if __name__=='__main__':
c = consumer()
produce(c)
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
注意到consumer函数是一个generator(生成器),把一个consumer传入produce后:
首先调用c.next()启动生成器;
然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
consumer通过yield拿到消息,处理,又通过yield把结果传回;
produce拿到consumer处理的结果,继续生产下一条消息;
produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
最后套用Donald Knuth的一句话总结协程的特点:
“子程序就是协程的一种特例。”
tornado中实现协程的方式如下所示:
使用async关键字标记协程,await关键字调用协程并获取协程的结果,强烈建议弃用@ coroutine装饰器来标记协程。
#1. 什么是协程
#1.回调过深造成代码很难维护
#2.栈撕裂造成异常无法向上抛出
#协程- 可以被暂停并切换到其他协程运行的函数
from tornado.gen import coroutine
async def yield_test():
yield 1
yield 2
yield 3
async def main():
result = await yield_test()
result = await yield_test2()
async def main2():
await yield_test()
my_yield = yield_test()
for item in my_yield:
print(item)
AsyncHttpClient异步http请求
这里直接给出实例,获取百度网页数据:
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# author:pyy
# datetime:2018/12/20 14:49
from tornado import httpclient
# 这里的HTTPClient是阻塞IO的形式
# http_client = httpclient.HTTPClient()
# try:
# response = http_client.fetch("http://www.baidu.com/")
# print(response.body.decode("utf-8"))
# except httpclient.HTTPError as e:
# # HTTPError is raised for non-200 responses; the response
# # can be found in e.response.
# print("Error: " + str(e))
# except Exception as e:
# # Other errors are possible, such as IOError.
# print("Error: " + str(e))
# http_client.close()
# 定义协程函数
async def f():
http_client = httpclient.AsyncHTTPClient()
try:
response = await http_client.fetch("http://www.baidu.com")
except Exception as e:
print("Error: %s" % e)
else:
print(response.body.decode("utf-8"))
if __name__ == "__main__":
#启动事件循环
import tornado
io_loop = tornado.ioloop.IOLoop.current()
io_loop.run_sync(f)
tornado实现高并发的爬虫
1、安装需要使用的库
pip install beautifulsoup4
2、代码编写
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# author:pyy
# datetime:2018/12/20 15:46
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from tornado import gen, httpclient, ioloop, queues
base_url = "http://www.tornadoweb.org/en/stable/"
# 协程数量
concurrency = 20
async def get_url_links(url):
response = await httpclient.AsyncHTTPClient().fetch("http://www.tornadoweb.org/en/stable/")
html = response.body.decode("utf8")
soup = BeautifulSoup(html)
# 获取到所有的a标签,也就是所有的url,并与base_url进行拼接
links = [urljoin(base_url, a.get("href")) for a in soup.find_all("a", href=True)]
return links
async def main():
seen_set = set() # 已经爬取过的网页url集合
q = queues.Queue() # 待爬取的网页url队列
async def fetch_url(current_url):
#生产者
#已经爬取过的url,就直接return
if current_url in seen_set:
return
print("获取: {}".format(current_url))
seen_set.add(current_url)
next_urls = await get_url_links(current_url)
for new_url in next_urls:
if new_url.startswith(base_url):
#放入队列,
await q.put(new_url)
# 消费者协程
async def worker():
async for url in q:
if url is None:
return
try:
await fetch_url(url)
except Exception as e:
print("excepiton")
finally:
# 已爬取的网页从队列中剔除
q.task_done()
#放入初始url到队列
await q.put(base_url)
#启动协程
workers = gen.multi([worker() for _ in range(concurrency)])
await q.join() # 等待q队列为空,也就是所有的url都爬取完毕,此时生产者协程停止
#在队列中放入三个None,去停止消费者协程
for _ in range(concurrency):
await q.put(None)
await workers
if __name__ == "__main__":
io_loop = ioloop.IOLoop.current()
io_loop.run_sync(main)