Python 学习笔记19 - 异步IO

2017-09-14  本文已影响0人  WesleyLien

一种解决IO问题的方法是异步IO。当代码需要执行一个耗时的IO操作时,它只发出IO指令,并不等待IO结果,然后就去执行其他代码了。一段时间后,当IO返回结果时,再通知CPU进行处理

消息模型是如何解决同步IO必须等待IO操作这一问题的呢?当遇到IO操作时,代码只负责发出IO请求,不等待IO结果,然后直接结束本轮消息处理,进入下一轮消息处理过程。当IO操作完成后,将收到一条“IO完成”的消息,处理该消息时就可以直接获取IO操作结果。

在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。对于大多数IO密集型的应用程序,使用异步IO将大大提升系统的多任务处理能力。

异步IO模型需要一个消息循环,在消息循环中,主线程不断地重复“读取消息-处理消息”这一过程:

loop = get_event_loop()
while True:
    event = loop.get_event()
    process_event(event)

协程

协程,又称微线程,纤程。英文名Coroutine

子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。
所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。
子程序调用总是一个入口,一次返回,调用顺序是明确的。

协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。

子程序就是协程的一种特例

和多线程比,协程的优势在于:

因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

Python 对协程的支持是通过 generator 实现的:
generator 中,我们不但可以通过 for 循环来迭代,还可以不断调用 next() 函数获取由 yield 语句返回的下一个值。
但是 Python 的 yield 不但可以返回一个值,它还可以接收调用者发出的参数

传统的生产者-消费者模型改用协程:

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    # 调用c.send(None)启动生成器
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

注意到 consumer 函数是一个 generator ,把一个 consumer 传入 produce 后:

  1. 首先调用 c.send(None) 启动生成器;
  2. 然后,一旦生产了东西,通过 c.send(n) 切换到 consumer 执行;
  3. consumer 通过 yield 拿到消息,处理,又通过 yield 把结果传回;
  4. produce 拿到 consumer 处理的结果,继续生产下一条消息;
  5. produce 决定不生产了,通过 c.close() 关闭 consumer ,整个过程结束。

asyncio

asyncio 的编程模型就是一个消息循环。我们从 asyncio 模块中直接获取一个 EventLoop 的引用,然后把需要执行的协程扔到 EventLoop 中执行,就实现了异步IO

import threading
import asyncio

# @asyncio.coroutine 把一个 generator 标记为 coroutine 类型
@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    
    # 异步调用asyncio.sleep(1)
    # yield from 语法可以让我们方便地调用另一个 generator
    # 由于 asyncio.sleep() 也是一个 coroutine ,所以线程不会等待 asyncio.sleep() ,而是直接中断并执行下一个消息循环
    # 当 asyncio.sleep() 返回时,线程就可以从 yield from 拿到返回值(此处是None),然后接着执行下一行语句
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

# 获取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
# 把这个 coroutine 扔到 EventLoop 中执行
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

asyncio 的异步网络连接来获取sina、sohu和163的网站首页:

import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

async/await

asyncio 提供的 @asyncio.coroutine 可以把一个 generator 标记为 coroutine 类型,然后在 coroutine 内部用 yield from 调用另一个 coroutine 实现异步操作

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法 asyncawait ,可以让 coroutine 的代码更简洁易读

要使用新的语法,只需要做两步简单的替换:

  1. @asyncio.coroutine 替换为 async
  2. yield from 替换为 await

对比一下上一节的代码:

@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")

用新语法重新编写如下:

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

aiohttp

asyncio可以实现单线程并发IO操作

如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持

asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架

$ pip install aiohttp
上一篇下一篇

猜你喜欢

热点阅读