Python 基于协程的异步 IO
转载请注明出处
异步 IO
参考链接
我们使用的是 Python 3 内置的模块 asyncio
概念说明
-
event_loop 事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数
-
coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
例如:# 这样就是注册了一个协程 async def test1(): pass return 'ok' coroutine = test1()
-
task 任务:一个协程对象就是一个原生可以挂起的函数,task 则是对协程进一步封装,其中包含了任务的各种状态
-
future: 代表将来执行或没有执行的任务的结果。它和 task 上没有本质上的区别
-
async/await 关键字:Python3.5 用于定义协程的关键字,async 定义一个协程,await 用于挂起阻塞的异步调用接口。
定义一个简单的协程
import asyncio
# 这里定义了一个协程函数
async def test():
pass
return 'ok'
# 这里是提前写好的回调函数
def test(future):
print(future)
# 这里是返回了一个协程对象
coroutine = test()
# 把协程对象封装成 task
task = asyncio.ensure_future(coroutine)
# 创建消息循环
loop = asyncio.get_event_loop()
# 设置回调函数,我们通过回调函数,当任务执行完毕,就会把结果传到回调函数中
task.add_done_callback(test)
# 把 task 加入到消息队列中去运行,这一步一定要到最后
# 这是一个阻塞式的操作
loop.run_until_complete(task)
await 关键字
await
就类似于 yield from
使用 async 可以定义协程对象,使用 await 可以针对耗时的操作进行挂起,就像生成器里的 yield 一样,函数让出控制权。协程遇到 await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行 。
这里我们用内置的 asyncio.sleep(1) ,模拟网络请求
import asyncio
async def test():
# 这里就是使用 await 关键字,遇到这个,事件循环就会挂起该协程,然后执行其他协程。
# 直到其他的协程也挂起或者执行完毕,才继续执行下面的代码,异步执行的,就是说总时间不是叠加,而是算最长的
await asyncio.sleep(1)
return 'ok'
def a(future):
print(future.result())
coroutine = test()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
task.add_done_callback(a)
loop.run_until_complete(task)
并发操作(多个协程)
import asyncio
import time
start = time.clock()
async def test(a):
await asyncio.sleep(a)
return 'ok'
coroutine1 = test(1)
coroutine2 = test(1)
coroutine3 = test(1)
coroutine4 = test(1)
# 这里就是多个任务。
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
asyncio.ensure_future(coroutine4),
]
loop = asyncio.get_event_loop()
# loop.run_until_complete(asyncio.gather(*tasks))
# 两个方式都可以,实现并发的时候,要把协程像下面一样并列的执行,
# 如果有协程嵌套,那么子协程如果要并发,可以使用 await asyncio.gather(*tasks)
results = loop.run_until_complete(asyncio.wait(tasks))
end = time.clock()
print(end - start)
# 可以通过这个来获取每一个,协程的返回值,Python 官方不推荐这样做,不推荐使用返回值,而是直接在协程中完成任务
for item in results[0]:
print(item.result())
run_until_complete 和 run_forever
我们一直通过 run_until_complete
来运行 loop ,等到 future 完成,run_until_complete
也就返回了。
import asyncio
async def do_some(n):
print('等待1秒')
await asyncio.sleep(n)
print('完成')
return 'ok'
loop = asyncio.get_event_loop()
coroutine = do_some(1)
task = asyncio.ensure_future(coroutine)
# 等所有的 task 都执行完了,这个函数就会结束,程序退出。
res = loop.run_until_complete(task)
print(res.result())
现在改用 run_forever
import asyncio
async def do_some(n):
print('等待1秒')
await asyncio.sleep(1)
print('完成')
coro = do_some(1)
asyncio.ensure_future(coro)
loop = asyncio.get_event_loop()
# 一秒后结果输出,但是,线程一直被阻塞,程序没有退出。
loop.run_forever()
使用run_forever()
,future 结束,但是程序并不会退出。run_forever
会一直运行,知道 loop.stop()
被调用,但是你不能像下面这样调用
loop.run_forever()
loop.stop()
run_forever()
不返回,stop
永远也不会被调用,所以我们只能在协程中调用 stop
,如下
async def do_some(loop):
print('Waiting 3 second')
await asyncio.sleep(3)
print('Done')
loop.stop()
这样并非没有问题,假设有多个协程在 loop 里面运行
import asyncio
async def do_some(loop, n):
print('等待{}秒'.format(n))
await asyncio.sleep(n)
print('{} 完成'.format(n))
# 第二个协程还没有结束,loop 就停止了,被先完成的那个协程给 stop
loop.stop()
loop = asyncio.get_event_loop()
coro1 = do_some(loop, 1)
coro2 = do_some(loop, 2)
asyncio.ensure_future(coro1)
asyncio.ensure_future(coro2)
loop.run_forever()
要解决这个问题,那我们就要使用 gather
把多个协程合并为一个 future ,并添加回调。然后在回调中停止。
asyncio.gather 和 asyncio.ensure_future 可以把新的任务加入到事件循环当中
import asyncio
import functools
async def do_some(n):
print('等待{}秒'.format(n))
await asyncio.sleep(n)
print('{} 完成'.format(n))
# 这个 future 是我们执行后的结果集
def done_callback(loop, future):
loop.stop()
pass
loop = asyncio.get_event_loop()
futures = asyncio.gather(do_some(1), do_some(3))
# 这里式使用了偏函数,当然我们也可以使用闭包
futures.add_done_callback(functools.partial(done_callback, loop))
loop.run_forever()
--------------------------------------------
# 使用闭包
def done_callback(loop):
def inner(future):
loop.stop()
return inner
loop = asyncio.get_event_loop()
futures = asyncio.gather(do_some(1), do_some(3))
futures.add_done_callback(done_callback(loop))
loop.run_forever()
---------------------------------------------
这样就可以啦,其实我们这里就是手动的实现了 run_until_complete
,而且 run_until_complete
内部也是调用了run_forever
。
Close Loop ?
以上示例都没有调用 loop.close
,好像也没有什么问题。所以到底要不要调 loop.close
呢? 简单来说,loop 只要不关闭,就还可以再运行。:
loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()
但是如果关闭了,就不能再运行了:
loop.run_until_complete(do_some_work(loop, 1))
loop.close()
# 此处异常
loop.run_until_complete(do_some_work(loop, 3))
建议调用 loop.close,以彻底清理 loop 对象防止误用。
生产者消费者
import asyncio
from asyncio import Queue
async def producer(queue):
print('开始生成数据')
for i in range(100):
await queue.put(i)
print('已经生产数据 {}'.format(i))
if queue.qsize() >= 10:
await queue.join()
async def comsumer(queue):
print('开始消费数据')
while True:
item = await queue.get()
print('消费了数据 {}'.format(item))
queue.task_done()
loop = asyncio.get_event_loop()
queue = Queue()
loop.run_until_complete(asyncio.gather(producer(queue), comsumer(queue)))
协程 API
loop = asyncio.get_event_loop() # 获取事件循环
loop = run_until_complete() # 开启循环
asyncio.ensure_future() # 把协程加入到循环,直到调用 loop.run_forever() 才会执行
asyncio.gather() # 把更多的协程加入到循环
asyncio.Semaphore(5) # 限制同时运行的协程数量
Semaphore 例子
import asyncio
import aiohttp
# Semaphore 只要放在协程里面就可以,用 async with 修饰,windows 限制了 Semap 的数量,Linux 没有限制。
class AsyncRequest(object):
sem = asyncio.Semaphore(500)
@classmethod
async def request(cls, url=None, method='GET', cookies=None, params=None, data=None, headers=None, encode='utf8'):
async with cls.sem:
async with aiohttp.ClientSession(cookies=cookies) as session:
if method == 'GET':
async with session.get(url, params=params, headers=headers) as response:
return {
'text': await response.text(encoding=encode),
'url': response.url,
'headers': response.headers,
'read': await response.read(),
'status_code': response.status,
'cookies': response.cookies
}
elif method == 'POST':
async with session.post(url, data=data, headers=headers) as response:
return {
'text': await response.text(encoding=encode),
'url': response.url,
'headers': response.headers,
'read': await response.read(),
'status_code': response.status,
'cookies': response.cookies
}