asyncio并发编程-上
asyncio
是Python
中解决异步I/O高并发的一个模块。
asyncio的事件循环
我们先看下asyncio
有哪些功能:
-
包含各种特定系统实现的模块化事件循环(针对不同系统都能兼容的事件循环:例如Windows下的
select
,linux下的epoll
。) -
传输和协议抽象(对TCP和UDP协议的抽象)
-
对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持
-
模仿
futures
模块但适用于事件循环使用的Future
类 -
基于
yield from
的协议和任务,可以让我们使用顺序的方式编写并发代码 -
必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件迁移到线程池
-
模仿
threading
模块中的同步原语,可以用在单线程内的协程之间
前面我们学习了协程,但是协程脱离事件循环意义就不是很大了。
下面我们开始学习asyncio
的使用吧!💪
首先我们明确一点,高并发异步IO编程的编码模式由三部分组成:
事件循环+回调(驱动生成器)+epoll
(IO多路复用)
asyncio
是Python
用于解决异步io编程的一整套解决方案
有趣的小知识:
tornado
也是基于asyncio
的异步框架,通过协程和事件循环来完成高并发。相对于Django
和Flask
这种传统的阻塞IO框架本身不提供web服务器,不会去完成Socket
编码的,因此我们在部署的时候会搭配实现了SOcket
编码的框架(uwsgi, gunicorn+nginx)。Tornado
实现了自己的web服务器,因此我们部署Tornado
的时候是可以直接部署的(会使用epoll
来完成socket请求),但是真正部署的时候,还是会使用nginx
来完成一些操作(IP限制等)。因此Tornado
的数据库驱动就不能使用阻塞IO驱动框架了。
asyncio
的简单使用:
协程要搭配事件循环才能使用
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")
if __name__ == "__main__":
start_time = time.time()
# 我们使用 asyncio 实现的事件循环 这个loop就可完成 之前我们自己实现的 事件循环 select 的操作
loop = asyncio.get_event_loop()
# 可以使用 run_until_complete 进行协程的调用 这是一个阻塞函数 可以理解为多线程编程中的jion方法 然后把 asyncio理解为协程池
loop.run_until_complete(get_html("http://www.imooc.com"))
print(time.time()-start_time)
# 输出
start get url
end get url
2.0019102096557617
我们可以同时执行多个协程,传入一个可迭代的任务对象
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# 这个 tasks 可以是不同的协程
tasks = [get_html("http://www.imooc.com") for i in range(10)]
# asyncio.wait()函数会接收一个可迭代对象
loop.run_until_complete(asyncio.wait(tasks))
print(time.time()-start_time)
# 输出就不打印了 耗时大概两秒
注意:在协程中不能使用同步的时间睡眠
time.sleep()
,否则当执行的协程超过一个的时候就会出现同步阻塞的情况。
要是哪个小伙伴想测试下上面那句话,可以将上面的代码await asyncio.sleep(2)
改为time.sleep(2)
你会发现运行的时间不再是两秒了,而是20+秒。
为什么不能再协程使用同步的sleep
呢?
这就要说到我们的loop
小朋友了,协程要配合事件循环的,我们在运行协程的时候当遇到await
关键字就知道这是一个异步阻塞操作了,会在此处暂停返回一个Future
对象,然后由loop
小朋友再执行已经可以运行的协程。这样保证了能够异步执行操作。当我们直接在协程中使用sleep
同步操作时候,不会暂停而是一直等待,这就是原因😊
如何获得协程的返回值呢?
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
return "红烧肉"
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# 这里使用 asyncio.ensure_future 来获得一个future对象 是不是很像多线程编程中的 submit
get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
# 也可以使用 loop 的 create_task 两者用法一样
# task = loop.create_task()
# task 是 future 的子类
# 可以将future 对象传入到 run_until_complete
loop.run_until_complete(get_future)
# 通过 future 对象的 result函数获得结果
print(get_future.result())
# 输出
start get url
红烧肉
上面的代码还可以这么写
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
task = loop.create_task(get_html("http://www.imooc.com"))
loop.run_until_complete(task)
print(task.result())
我们看到使用loop.create_task
和asyncio.ensure_future
是一样的效果,具体区别我们稍后会学习到。💪
有没有小伙伴怀疑,当使用asyncio.ensure_future
的时候是何时和我们创建的loop
建立联系的呢,是在loop.run_until_complete(get_future)
的时候吗?
让我们看下ensure_future
的源码:
def ensure_future(coro_or_future, *, loop=None):
"""Wrap a coroutine or an awaitable in a future.
If the argument is a Future, it is returned directly.
"""
if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future')
return coro_or_future
elif coroutines.iscoroutine(coro_or_future):
# 看这里 当没有loop传入的时候,会获得当前loop 因为线程中只有这个一个 loop 这里启动loop和外层代码的loop是同一个loop
if loop is None:
loop = events.get_event_loop()
# 我们看到 内部同样是使用 create_task
task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
return task
elif compat.PY35 and inspect.isawaitable(coro_or_future):
return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
else:
raise TypeError('A Future, a coroutine or an awaitable is required')
除了上面直接调用协程,我们还可以在协程执行完成之后进行一个回调。
import asyncio
import time
from functools import partial
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
return "红烧肉"
# 当我们想要在 回调函数中传递参数的时候 注意 future 参数写在最后
def callback(url, future):
print(url)
print("send email to 红烧肉")
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
task = loop.create_task(get_html("http://www.imooc.com"))
task.add_done_callback(partial(callback, "http://www.imooc.com"))
loop.run_until_complete(task)
print(task.result())
我们使用partial
将传入的参数,伪造成一个函数。
回调函数会默认接收一个 future 对象参数
wait和gather
我们上面已经使用了wait
来进行多协程的运行,我们看下它的源码:
FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
@coroutine
def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the Futures and coroutines given by fs to complete.
The sequence futures must not be empty.
Coroutines will be wrapped in Tasks.
Returns two sets of Future: (done, pending).
Usage:
done, pending = yield from asyncio.wait(fs)
Note: This does not raise TimeoutError! Futures that aren't done
when the timeout occurs are returned in the second set.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError('Invalid return_when value: {}'.format(return_when))
if loop is None:
loop = events.get_event_loop()
fs = {ensure_future(f, loop=loop) for f in set(fs)}
return (yield from _wait(fs, timeout, return_when, loop))
这个wait
我们理解为多线程中的wait
,同样存在return_when
参数,可以指定何时返回。
那gather
如何使用呢?
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("http://www.imooc.com") for i in range(10)]
loop.run_until_complete(asyncio.gather(*tasks))
print(time.time()-start_time)
我们将wait
直接修改为gather
然后可迭代对象加上*
即可。
两者的区别是什么呢?
-
gather
更加hight-level -
gather
可以将协程分组
group1 = [get_html("http://projectsedu.com") for i in range(2)]
group2 = [get_html("http://www.imooc.com") for i in range(2)]
# 我们可以分组传递
loop.run_until_complete(asyncio.gather(*group1, *group2))
# 我们可以将先进行gather操作
group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)
loop.run_until_complete(asyncio.gather(group1, group2))
# 我们可以批量取消某个分组
group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)
group2.cancel()
task取消和子协程调用原理
我们先看下run_until_complete
和run_forever
两个函数的区别。
run_until_complete
在运行完指定的协程之后就会停止,而run_forever
则会一直运行。
看下源码:
image.png在图片中我们看到run_until_complete
里面同样使用了run_forever
。但是,增加了一个回调_run_until_complete_cb
:
def _run_until_complete_cb(fut):
exc = fut._exception
if (isinstance(exc, BaseException) and not isinstance(exc, Exception)):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
fut._loop.stop()
在回调函数中当没有协程运行的时候会将loop
即事件循环直接暂停。
asyncio
会将loop
放到future
中,而future
同样会被放到loop
中。因此我们可以在任何一个任务中停止掉 loop
如何取消协程中的task(future)
import asyncio
async def get_html(sleep_times):
print("waiting")
await asyncio.sleep(sleep_times)
print("done after {}s".format(sleep_times))
if __name__ == "__main__":
task1 = get_html(2)
task2 = get_html(3)
task3 = get_html(3)
tasks = [task1, task2, task3]
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
# 我们发送一个 controle + c 异常
except KeyboardInterrupt as e:
# 获得所有的task
all_tasks = asyncio.Task.all_tasks()
for task in all_tasks:
print("cancel task")
# 将task 取消 返回布尔值
print(task.cancel())
# 先将 loop 暂停
loop.stop()
# 记得将 loop 再次运行 run_forever 否则将报错
loop.run_forever()
finally:
# 最后 关闭 loop
loop.close()
有咩有小伙伴对all_tasks = asyncio.Task.all_tasks()
这句代码疑惑?
我们看了源码就知道了 因为全局只有一个loop
,所以能够在任何位置轻松获得loop
相关的信息。
如何在协程中插入子协程
我们看一段官方文档的代码:
官方文档叫chain coroutines
链式协程?
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
compute()
is chained to print_sum()
: print_sum()
coroutine waits until compute()
is completed before returning its result.
序列图:
image.png图中展示大致意思:当我们运行一个协程的时候,立即创建一个Task
,由EventLoop
驱动Task
,然后Task
驱动print_sum
。当协程中调用了另外一个子协程的时候,是直接由Task
和子协程通信的。直至子协程运行完毕抛出StopIteration
异常,然后父协程会捕捉到异常并提取出结果,父协程运行完毕,同样抛出异常,逐层往上抛出然后终止Task
。重点在于Task
和子协程compute
之间的通道,以及异常抛出拦截。
The “Task” is created by the
AbstractEventLoop.run_until_complete()
method when it gets a coroutine object instead of a task.
意思是,图中的Task
并不是一个任务而是一个协程对象。
The diagram shows the control flow, it does not describe exactly how things work internally. For example, the sleep coroutine creates an internal future which uses
AbstractEventLoop.call_later()
to wake up the task in 1 second.
意思是,图大致讲了如何在协程中调用子协程,但是内部实现没有体现出来。
例如:当调用asyncio.sleep(1.0)
的时候会创建一个内部的future
对象然后使用 AbstractEventLoop.call_later()
在一秒后唤醒任务。
asyncio中的其他函数
call_soon 函数
import asyncio
def callback(sleep_times):
print(f"success time {sleep_times}")
def stoploop(loop):
loop.stop()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# 这里传入的是函数名称 不是协程 因为很多时候 我们希望在循环体系中插入一个函数
# call_soon 是即刻执行 比不是下一行代码执行 而是等到下一个循环的时候执行
loop.call_soon(callback, 2)
# 停止时间循环
loop.call_soon(stoploop, loop)
# 因为我们传入的不是协程 而是函数 因此启动要使用 run_forever
loop.run_forever()
call_later 函数
import asyncio
def callback(sleep_times):
print(f"success time {sleep_times}")
def stoploop(loop):
loop.stop()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# call_later 是延迟调用
loop.call_later(2, callback, 2)
loop.call_later(1, callback, 1)
loop.call_later(3, callback, 3)
loop.run_forever()
# 输出
success time 1
success time 2
success time 3
从输出看出 call_later
并不是根据添加的顺序执行的 而是根据延迟的时间。
为了进一步比较call_later
和call_soon
的区别我们看下下面代码的输出
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# call_later 是延迟调用
loop.call_later(2, callback, 2)
loop.call_later(1, callback, 1)
loop.call_later(3, callback, 3)
loop.call_soon(callback, 4)
loop.run_forever()
# 输出
success time 4
success time 1
success time 2
success time 3
我们看到call_soon
执行是比call_later
要早的 是下个循环立即执行
call_at函数
call_at
函数可以让我们指定时间运行回调函数,这里的时间是 loop
里面的时间 不是传统的时间
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# 获得loop的当前时间
loop_time = loop.time()
# 使用 call_at 在 当前时间的基础上 延迟几秒执行回调
loop.call_at(loop_time + 2, callback, 2)
loop.call_at(loop_time + 1, callback, 1)
loop.call_at(loop_time + 3, callback, 3)
loop.call_soon(callback, 4)
loop.run_forever()
# 输出
success time 4
success time 1
success time 2
success time 3
call_soon_threadsafe 函数
这是一个线程安全的函数 作用和 call_soon
一样
asyncio
是可以在多线程环境下运行的,asyncio
是一整套的异步IO解决方案,不仅可以解决协程调度问题,还可以解决线程、进程问题。
def call_soon_threadsafe(self, callback, *args):
"""Like call_soon(), but thread-safe."""
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
handle = self._call_soon(callback, args)
if handle._source_traceback:
del handle._source_tracebac
# 又这个函数实现线程安全的
self._write_to_self()
return handle
当我们在多线程中 多个回调函数使用了一个变量 可以使用这个来保证线程安全