Tornado项目实战:高并发技术论坛网站

04、异步与非阻塞

2018-12-20  本文已影响3人  vannesspeng

项目GIthub源码地址:[https://github.com/vannesspeng/TornadoForum]

1、tornado提供了哪些功能

Tornado是一个Python web框架和异步网络库,它最初基于FriendFeed库之上开发。通过使用非阻塞的网络IO,Tornado可以扩展到数万个开放连接,使其成为长轮询WebSockets和其他需要与每个用户建立长期连接的应用程序的理想选择 。Tornado Web框架和HTTP服务器一起提供了WSGI的全栈替代方案
Tornado大致可分为四个主要部分:

2、同步、异步、阻塞和非阻塞的概念

一些事实
1、cpu的速度远高于io速度
2、io包括文件访问和本地文件访问,比如requests、urllibs等传统的网络库都是同步的io
3、网络io大部分的的时间都是处于等待状态,在等待的时候cpu是空闲的,但是又不能执行其他操作。

阻塞与非阻塞
阻塞是指调用函数时,当前线程被挂起。
非阻塞是指调用函数时,当前线程不会被挂起,而是立即返回。

同步与非异步
同步:所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回。(比如你叫别人做饭给你吃,别人去做饭了,你就一直在餐桌边等待饭做好)
异步:异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。(比如你通知别人做饭,如果别人同意会立即返回1,你等待饭做好的这段时间,可以去做其他的事情,比如看电视、玩游戏,做好之后,别人总会有通知你的方式;如果别人拒绝给你做饭,她也会立刻通知你,返回-1,这个时候也不会产生阻塞;还有一种情况,是别人答应你做饭,结果过一会儿,又反悔说不做了,但也会通知你。但是最后吃饭,你需要自己先去获取通知信息,才能去端饭吃)

同步和异步关注的是获取结果的方式。同步是获取到结果之后才进行下一步操作,阻塞非阻塞关注的是调用接口时当前线程的状态,同步可以调用阻塞也可以调用非阻塞。异步是调用非阻塞接口。

3、socket的非阻塞io请求html

这里首先介绍传统的通过request库来请求html

import requests
html = requests.get("http://www.baidu.com").text
# 需要等待Tcp协议的三次握手来建立连接后,才能获取html文本,属于阻塞式的IO
print(html)

接下来使用socket的方式来实现

import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = "www.baidu.com"
client.connect((host, 80))    # 同样也属于阻塞IO方式
client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format("/", host).encode("utf-8"))

data = b""
while 1:
    d = client.recv(1024)
    if d:
        data += d
    else:
        break

data = data.decode("utf-8")
print(data)

那么如何实现非阻塞式的IO呢,方法如下:

#!/usr/bin/env python
#-*- coding:utf-8 -*-
# author:pyy
# datetime:2018/12/18 15:58

import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)     # 设置client为非阻塞式IO的模式
host = "www.baidu.com"

try:
    client.connect((host, 80))  # 阻塞IO
except BlockingIOError as e:
    # 做一些其他的事情,提供CPU使用率,比如爬虫程序可以爬取另外一个网页的数据
    pass

while 1:
    try:
        client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format("/", host).encode("utf-8"))
        print("send success")
        break
    except OSError as e:
        pass

data = b""
while 1:
    try:
        d = client.recv(1024)
    except BlockingIOError as e:
        continue

    if d:
        data += d
    else:
        break

data = data.decode("utf-8")
print(data)

这种方式虽然实现了非阻塞式的IO请求html,但是还是使用while循环去等待某种状态就绪,其实还是存在阻塞的嫌疑,那有没有方法可以实现,当方法执行状态成功时,可以主动通知调用者呢,比如当client连接远程主机成功时,我们马上可以知道连接成功的状态,答案是有的,那就是我们下一小节要介绍的select、poll和epoll。

select、poll和epoll

selct,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符。一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上就是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。
select
select函数监视的文件描述符分为三类:writefds, readfds,exceptfds。调用select后,select函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回,设为null即可)函数返回,这个时候程序就会系统调用,将数据从kernel复制到进程缓冲区。。当select函数返回后,可以遍历fdset来找到就绪的描述符。
select目前几乎在所有的平台上支持,其良好夸平台支持也是它的一个优点。
select的缺点
1、根据fd_size的定义,它的大小为32个整数大小(32位机器为32*32,所有共有1024bits可以记录fd),每个fd一个bit,所以最大只能同时处理1024个fd。
2、每一次呼叫 select( ) 都需要先从 user space把 FD_SET复制到 kernel(约线性时间成本)
为什么 select 不能像epoll一样,只做一次复制就好呢?
每一次呼叫 select()前,FD_SET都可能更动,而 epoll 提供了共享记忆存储结构,所以不需要有 kernel 與 user之间的数据沟通。
3、每次要判断【有哪些event发生】这件事的成本很高,也就是遍历fd_set,因为select(polling也是)采取主动轮询机制.

poll
poll的原理与select非常相似,差别如下:

epoll
epoll 提供了三个函数:

1、int epoll_create(int size);
建立一個 epoll 对象,并传回它的id
2、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
事件注册函数,将需要监听的事件和需要监听的fd交给epoll对象
3、int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
等待注册的事件被触发或者timeout发生

epoll解决的问题:

自己去实现事件循环

如下先看一个select实例

import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ

selector = DefaultSelector()

class Fetcher:
    def connected(self, key):
        selector.unregister(key.fd)
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format("/", self.host).encode("utf8"))
        selector.register(self.client.fileno(), EVENT_READ, self.readble)

    def readble(self, key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode("utf8")
            print(data)

    def get_url(self, url):
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)
        self.data = b""
        self.host = "www.baidu.com"

        try:
            self.client.connect((self.host, 80))  # 阻塞io, 意味着这个时候cpu是空闲的
        except BlockingIOError as e:
            # 做一些其他事
            pass

        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)

def loop_forever():
    #事件循环
    while 1:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data
            call_back(key)

if __name__ == "__main__":
    fetcher = Fetcher()
    url = "http://www.baidu.com"
    fetcher.get_url(url)
    loop_forever()

以上select实例是基于方法回调模式的实现,它一下几个缺点:
1、回调过深,造成代码维护非常困难。
2、栈撕裂,造成异常无法抛出(比如connected、readble方法出现异常是无法抛出的)
要解决以上问题,就需要利用下一小节中介绍的协程。

协程

协程,又称微线程,纤程。英文名Coroutine。

协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。

子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。

所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。

子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。

协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

def A():
    print '1'
    print '2'
    print '3'

def B():
    print 'x'
    print 'y'
    print 'z'

假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:

1
2
x
y
3
z

但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。

看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有何优势?

最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

Python对协程的支持还非常有限,用在generator中的yield可以一定程度上实现协程。虽然支持不完全,但已经可以发挥相当大的威力了。

来看例子:

传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。

如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:

import time

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'

def produce(c):
    c.next()
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

if __name__=='__main__':
    c = consumer()
    produce(c)

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

注意到consumer函数是一个generator(生成器),把一个consumer传入produce后:

首先调用c.next()启动生成器;

然后,一旦生产了东西,通过c.send(n)切换到consumer执行;

consumer通过yield拿到消息,处理,又通过yield把结果传回;

produce拿到consumer处理的结果,继续生产下一条消息;

produce决定不生产了,通过c.close()关闭consumer,整个过程结束。

整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。

最后套用Donald Knuth的一句话总结协程的特点:

“子程序就是协程的一种特例。”

tornado中实现协程的方式如下所示:
使用async关键字标记协程,await关键字调用协程并获取协程的结果,强烈建议弃用@ coroutine装饰器来标记协程。

#1. 什么是协程
#1.回调过深造成代码很难维护
#2.栈撕裂造成异常无法向上抛出
#协程- 可以被暂停并切换到其他协程运行的函数
from tornado.gen import coroutine

async def yield_test():
    yield 1
    yield 2
    yield 3

async def main():
    result = await yield_test()
    result = await yield_test2()

async def main2():
    await yield_test()

my_yield = yield_test()
for item in my_yield:
    print(item)

AsyncHttpClient异步http请求

这里直接给出实例,获取百度网页数据:

#!/usr/bin/env python
#-*- coding:utf-8 -*-
# author:pyy
# datetime:2018/12/20 14:49


from tornado import httpclient
# 这里的HTTPClient是阻塞IO的形式
# http_client = httpclient.HTTPClient()
# try:
#     response = http_client.fetch("http://www.baidu.com/")
#     print(response.body.decode("utf-8"))
# except httpclient.HTTPError as e:
#     # HTTPError is raised for non-200 responses; the response
#     # can be found in e.response.
#     print("Error: " + str(e))
# except Exception as e:
#     # Other errors are possible, such as IOError.
#     print("Error: " + str(e))
# http_client.close()


# 定义协程函数
async def f():
    http_client = httpclient.AsyncHTTPClient()
    try:
        response = await http_client.fetch("http://www.baidu.com")
    except Exception as e:
        print("Error: %s" % e)
    else:
        print(response.body.decode("utf-8"))

if __name__ == "__main__":
    #启动事件循环
    import tornado
    io_loop = tornado.ioloop.IOLoop.current()
    io_loop.run_sync(f)

tornado实现高并发的爬虫

1、安装需要使用的库
pip install beautifulsoup4

2、代码编写

#!/usr/bin/env python
#-*- coding:utf-8 -*-
# author:pyy
# datetime:2018/12/20 15:46
from urllib.parse import urljoin

from bs4 import BeautifulSoup
from tornado import gen, httpclient, ioloop, queues

base_url = "http://www.tornadoweb.org/en/stable/"
# 协程数量
concurrency = 20

async def get_url_links(url):
    response = await httpclient.AsyncHTTPClient().fetch("http://www.tornadoweb.org/en/stable/")
    html = response.body.decode("utf8")
    soup = BeautifulSoup(html)
    # 获取到所有的a标签,也就是所有的url,并与base_url进行拼接
    links = [urljoin(base_url, a.get("href")) for a in soup.find_all("a", href=True)]
    return links

async def main():
    seen_set = set()   # 已经爬取过的网页url集合
    q = queues.Queue()   # 待爬取的网页url队列

    async def fetch_url(current_url):
        #生产者
        #已经爬取过的url,就直接return
        if current_url in seen_set:
            return

        print("获取: {}".format(current_url))
        seen_set.add(current_url)
        next_urls = await get_url_links(current_url)
        for new_url in next_urls:
            if new_url.startswith(base_url):
                #放入队列,
                await q.put(new_url)
    # 消费者协程
    async def worker():
        async for url in q:
            if url is None:
                return
            try:
                await fetch_url(url)
            except Exception as e:
                print("excepiton")
            finally:
                # 已爬取的网页从队列中剔除
                q.task_done()

    #放入初始url到队列
    await q.put(base_url)

    #启动协程
    workers = gen.multi([worker() for _ in range(concurrency)])
    await q.join()  # 等待q队列为空,也就是所有的url都爬取完毕,此时生产者协程停止

    #在队列中放入三个None,去停止消费者协程
    for _ in range(concurrency): 
        await q.put(None)

    await workers


if __name__ == "__main__":
    io_loop = ioloop.IOLoop.current()
    io_loop.run_sync(main)
上一篇 下一篇

猜你喜欢

热点阅读