Python 借助 asyncio 实现并发编程
asyncio 基础
创建协程
使用 async 关键字创建 coroutine
async def coroutine_add_one(number: int) -> int:
return number + 1
def add_one(number: int) -> int:
return number + 1
function_result = add_one(1)
coroutine_result = coroutine_add_one(1)
print(
f'Function result is {function_result} and the type is {type(function_result)}')
# => Function result is 2 and the type is <class 'int'>
print(
f'Coroutine result is {coroutine_result} and the type is {type(coroutine_result)}')
# => Coroutine result is <coroutine object coroutine_add_one at 0x7f9f495f20a0> and the type is <class 'coroutine'>
# => sys:1: RuntimeWarning: coroutine 'coroutine_add_one' was never awaited
创建 coroutine 和创建普通的函数一样直接,唯一的区别在于使用 async def
而不是 def
。
当我们直接调用协程 coroutine_add_one
时,传入的参数并没有被加 1 然后返回计算结果,我们只是得到了一个 coroutine object
。
即我们只是创建了一个能够在之后的某个时间运行的 coroutine 对象,为了运行它,我们总是需要显式地将其放入 event loop 中。最简单的方式就是使用 asyncio.run
函数。
运行 coroutine
import asyncio
async def coroutine_add_one(number: int) -> int:
return number + 1
result = asyncio.run(coroutine_add_one(1))
print(result)
# => 2
asyncio.run
是 asyncio 应用程序的入口。
使用 await 关键字暂停执行
asyncio 的真正用处,在于能够在一个长时间运行的操作过程中,暂停执行,从而令 event loop 有机会处理其他任务。“暂停”的动作通过 await
关键字触发。await
后面通常紧跟着一个对 coroutine (更严谨地说,一个 awaitable
对象)的调用。
import asyncio
async def add_one(number: int) -> int:
return number + 1
async def main() -> None:
one_plus_one = await add_one(1)
two_plus_one = await add_one(2)
print(one_plus_one)
# => 2
print(two_plus_one)
# => 3
asyncio.run(main())
首先 await
对协程 add_one(1)
的调用,此时父协程(即 main()
)被暂停,add_one(1)
执行并获取结果(2
),main()
协程恢复执行,将结果赋值给 one_plus_one
;同样地,对协程 add_one(2)
的 await
也会导致 main()
被暂停和恢复。
sleep
前面的例子只是为了介绍协程的基本语法,并没有涉及任何 long-running 操作,因而也没有享受到 asyncio 在并发方面的作用。我们可以借助 asyncio.sleep
函数模拟 web API 请求或者数据库查询等长时间运行的操作,asyncio.sleep
能够令某个协程“睡眠”指定的时间(秒)。
asyncio.sleep
本身就是一个协程,因而当我们在某个协程中 await asyncio.sleep
时,其他部分代码就得到了执行的机会。
sleep 实现 delay 函数
# util.py
import asyncio
async def delay(delay_seconds: int) -> int:
print(f'sleeping for {delay_seconds} second(s)')
await asyncio.sleep(delay_seconds)
print(f'finished sleeping for {delay_seconds} second(s)')
return delay_seconds
运行两个协程
import asyncio
from util import delay
async def add_one(number: int) -> int:
return number + 1
async def hello_world_message() -> str:
await delay(1)
return 'Hello Wrold!'
async def main() -> None:
message = await hello_world_message()
one_plus_one = await add_one(1)
print(one_plus_one)
print(message)
# => sleeping for 1 second(s)
# => finished sleeping for 1 second(s)
# => 2
# => Hello Wrold!
asyncio.run(main())
运行上面的代码,先是等待 1 秒钟,之后才是两个函数调用的结果被打印出来。我们本来希望看到的是,两个协程并发地执行,add_one(1)
的结果直接被输出,并不需要等待 hello_world_message()
中的 sleep
结束。
实际上 await
会暂停其所在的协程(这里是 main
),并且不会执行当前协程中的任何其他代码,直到 await
表达式获得一个结果。hello_world_message
需要 1 秒钟才能返回结果,因而 main 协程也会被暂停 1 秒钟。排在它后面的 add_one(1)
在暂停结束后执行并返回结果。
上面的代码和同步、顺序执行的代码没有表现出任何区别。为了实现并发,我们需要引入一个新的概念 task。
tasks
Task 是对协程的一种包装,能够将一个协程调度至 event loop 并争取尽快执行。这种调度是以一种非阻塞的方式发生的,即 task 被创建后会立即返回,不必等待其运行结束,从而我们能够有机会执行其他代码。
并发地执行多个 task
import asyncio
from util import delay
async def hello_every_second():
for i in range(2):
await asyncio.sleep(1)
print("I'm running other code while I'm waiting!")
async def main():
first_delay = asyncio.create_task(delay(3))
second_delay = asyncio.create_task(delay(3))
await hello_every_second()
await first_delay
await second_delay
asyncio.run(main())
# => sleeping for 3 second(s)
# => sleeping for 3 second(s)
# => I'm running other code while I'm waiting!
# => I'm running other code while I'm waiting!
# => finished sleeping for 3 second(s)
# => finished sleeping for 3 second(s)
上述代码创建了 2 个 task,每个都需要 3 秒钟才能执行完毕。两次对 create_task
的调用都会立即返回。由于 task 调度的原则是尽快执行,当后面的 await
代码刷新了一次 event loop 之后,前面创建的 2 个 task 会立即被执行(非阻塞)。
两个 delay task 在 sleep
过程中,应用是闲置的,我们得以有机会运行其他代码。协程 hello_every_second
每隔 1 秒输出一条消息。整个应用总的运行时间大约是 3 秒,即大约等于耗时最长的异步任务的时间,而不是像顺序执行的程序那样,等于多个任务运行时间的总和。
协程和任务的陷阱
将一些长时间运行的任务并发的执行,能够带来很大程度上的性能提升。因而我们会倾向于在应用的任何地方使用协程和 task。事实上,仅仅将函数用 async 修饰,将其封装进 task,并不总是带来性能上的提升。甚至有些情况下还会降低程序的效率。
最主要的情形有两种,一个是在不借助多进程的情况下,尝试在 task 或协程中运行 CPU-bound 代码;另一种是在不借助多线程的情况下调用阻塞式 I/O-bound API。
CPU 密集型任务
有时候我们需要一些函数执行 CPU 密集型的任务,比如对一个很大的字典执行循环或者数学计算。为了提升效率,我们会想着将它们放置在单独的 task 中运行。然而现实是,asyncio 使用单线程并发模型,我们依然会受到单个线程和 GIL 的限制。
计算协程运行时间
# util.py
import asyncio
import functools
import time
from typing import Callable, Any
def async_timed():
def wrapper(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapped(*args, **kwargs) -> Any:
print(f'Starting {func} with {args} {kwargs}')
start = time.time()
try:
return await func(*args, **kwargs)
finally:
end = time.time()
total = end - start
print(f'finished {func} in {total:.4f} second(s)')
return wrapped
return wrapper
@async_timed()
async def delay(delay_seconds: int) -> int:
print(f'sleeping for {delay_seconds} second(s)')
await asyncio.sleep(delay_seconds)
print(f'finished sleeping for {delay_seconds} second(s)')
return delay_seconds
运行 CPU-bound 代码
import asyncio
from util import delay, async_timed
@async_timed()
async def cpu_bound_work() -> int:
counter = 0
for i in range(100000000):
counter = counter + 1
return counter
@async_timed()
async def main():
task_one = asyncio.create_task(cpu_bound_work())
task_two = asyncio.create_task(cpu_bound_work())
delay_task = asyncio.create_task(delay(4))
await task_one
await task_two
await delay_task
asyncio.run(main())
# => Starting <function main at 0x7f2d6b85bc70> with () {}
# => Starting <function cpu_bound_work at 0x7f2d6c2bba30> with () {}
# => finished <function cpu_bound_work at 0x7f2d6c2bba30> in 2.7423 second(s)
# => Starting <function cpu_bound_work at 0x7f2d6c2bba30> with () {}
# => finished <function cpu_bound_work at 0x7f2d6c2bba30> in 2.7430 second(s)
# => Starting <function delay at 0x7f2d6b85a0e0> with (4,) {}
# => sleeping for 4 second(s)
# => finished sleeping for 4 second(s)
# => finished <function delay at 0x7f2d6b85a0e0> in 4.0048 second(s)
# => finished <function main at 0x7f2d6b85bc70> in 9.4903 second(s)
上述代码创建了 3 个 task,但实际执行时依然是顺序的而非并发的,耗费的时间并没有变少。两个 CPU-bound task 是依次执行的,甚至 delay_task
也并没有与其他两个任务呈现并发性。原因在于我们先创建了两个 CPU-bound 任务,这两个任务会阻塞 event loop,阻止其调度执行任何其他任务。
因此,总的运行时间等于两个 CPU-bound 任务执行完毕的时间加上 delay 任务运行的 4 秒。即 asyncio 并没有为 CPU-bound 的任务带来任何性能上的提升。
假如我们需要在执行 CPU-bound 任务的同时仍使用 async
语法,就必须借助多进程,告诉 asyncio 在 process pool 中执行任务。
阻塞式 API
我们也会倾向于使用现有的库执行 I/O-bound 操作,再将其封装进协程。然而,这会引起与 CPU-bound 操作同样的问题。因为这些 API 会阻塞 main 线程。
当我们在协程内部调用一个阻塞的 API,我们会阻塞 event loop 线程本身,线程被阻塞请求占据,导致 event loop 无法调度任何其他协程和任务。阻塞式 API 请求包括 requests
库和 time.sleep
等。通常来说,任何执行 I/O 操作且不是协程的函数,以及执行 CPU 密集型任务的函数,都可以认为是阻塞的。
协程内部调用阻塞式 API
import asyncio
import requests
from util import async_timed
@async_timed()
async def get_example_status() -> int:
return requests.get('http://www.example.com').status_code
@async_timed()
async def main():
task_1 = asyncio.create_task(get_example_status())
task_2 = asyncio.create_task(get_example_status())
task_3 = asyncio.create_task(get_example_status())
await task_1
await task_2
await task_3
asyncio.run(main())
# => Starting <function main at 0x7f4335080790> with () {}
# => Starting <function get_example_status at 0x7f4335186170> with () {}
# => finished <function get_example_status at 0x7f4335186170> in 0.5144 second(s)
# => Starting <function get_example_status at 0x7f4335186170> with () {}
# => finished <function get_example_status at 0x7f4335186170> in 0.5163 second(s)
# => Starting <function get_example_status at 0x7f4335186170> with () {}
# => finished <function get_example_status at 0x7f4335186170> in 0.5177 second(s)
# => finished <function main at 0x7f4335080790> in 1.5488 second(s)
main
协程运行的总时间基本上等于所有 task 运行的时间之和。即我们并没有获取到任何并发上的收益。原因在于 requests
库是阻塞的,任何调用都会阻塞当前线程,而 asyncio 只有一个线程,在阻塞调用结束之前,线程中的 event loop 没有机会以异步的形式运行任何任务。
当你使用的库并没有返回协程,你并没有在自己的协程中使用 await
关键字,很大可能你就是在进行阻塞的函数调用。当前我们使用的大多数 API 都是阻塞的,并不支持与 asyncio 开箱即用。
要想体验到 asyncio 带来的异步和并发特性,就必须使用原生支持协程和非阻塞 socket 的库,比如 aiohttp
。或者你坚持使用 requests
库,同时又需要 async
语法,就必须显式地告诉 asyncio
使用多线程的方式,通过 thread pool executor 执行阻塞调用。
借助支持协程的库 aiohttp 实现并发
import asyncio
from aiohttp import ClientSession
from util import async_timed
@async_timed()
async def get_example_status() -> int:
session = ClientSession()
resp = await session.get('http://example.com')
await session.close()
return resp.status
@async_timed()
async def main():
task_1 = asyncio.create_task(get_example_status())
task_2 = asyncio.create_task(get_example_status())
task_3 = asyncio.create_task(get_example_status())
await task_1
await task_2
await task_3
asyncio.run(main())
# => Starting <function main at 0x7fd9f90b6a70> with () {}
# => Starting <function get_example_status at 0x7fd9f90b63b0> with () {}
# => Starting <function get_example_status at 0x7fd9f90b63b0> with () {}
# => Starting <function get_example_status at 0x7fd9f90b63b0> with () {}
# => finished <function get_example_status at 0x7fd9f90b63b0> in 0.5191 second(s)
# => finished <function get_example_status at 0x7fd9f90b63b0> in 0.5191 second(s)
# => finished <function get_example_status at 0x7fd9f90b63b0> in 0.5191 second(s)
# => finished <function main at 0x7fd9f90b6a70> in 0.5196 second(s)
可以看到所有 task 执行的总时间,基本上只比一个 task 运行的时间多一点点。此时的程序是并发执行的。
取消任务
取消任务
每个 task
对象都有一个 cancel
方法可以帮助我们随时终止该任务。当我们 await
取消的任务时,会报出 CancelledError
异常。
比如我们调度执行某个任务,又不希望该任务运行的时间超过 5 秒:
import asyncio
from asyncio import CancelledError
from util import delay
async def main():
long_task = asyncio.create_task(delay(10))
seconds_elapsed = 0
while not long_task.done():
print('Task not finished, checking again in a second.')
await asyncio.sleep(1)
seconds_elapsed = seconds_elapsed + 1
if seconds_elapsed == 5:
long_task.cancel()
try:
await long_task
except CancelledError:
print('Our task was cancelled')
asyncio.run(main())
# => Task not finished, checking again in a second.
# => Starting <function delay at 0x7fdb383ae0e0> with (10,) {}
# => sleeping for 10 second(s)
# => Task not finished, checking again in a second.
# => Task not finished, checking again in a second.
# => Task not finished, checking again in a second.
# => Task not finished, checking again in a second.
# => Task not finished, checking again in a second.
# => finished <function delay at 0x7fdb383ae0e0> in 5.0079 second(s)
# => Our task was cancelled
需要注意的是,CancelledError
只会在 await
语句处抛出,调用 cancel
方法并不会神奇地强行关闭正在运行的任务,只有你刚好遇到 await
时任务才会被终止,不然就等待下一个 await
。
使用 wait_for 设置超时时间
每隔一段时间手动进行检查,以确定是否取消某个任务,并不算一种简单的处理方式。asyncio 提供了一个 wait_for
函数,它接收一个协程或者任务,以及超时的秒数作为参数,返回一个协程对象。
若任务运行超时,一个 TimeoutException
就会被抛出,任务自动被终止。
import asyncio
from util import delay
async def main():
delay_task = asyncio.create_task(delay(2))
try:
result = await asyncio.wait_for(delay_task, timeout=1)
print(result)
except asyncio.exceptions.TimeoutError:
print('Got a timeout')
print(f'Was the task cancelled? {delay_task.cancelled()}')
asyncio.run(main())
# => Starting <function delay at 0x7f71e18160e0> with (2,) {}
# => sleeping for 2 second(s)
# => finished <function delay at 0x7f71e18160e0> in 1.0016 second(s)
# => Got a timeout
# => Was the task cancelled? True
asyncio.shield
在另外一些情况下,我们有可能并不希望直接取消某个超时的任务,而是当任务运行时间过长时,提醒用户这个情况,但是并不执行任何 cancel 操作。
shield
可以帮助我们实现这样的功能。
from util import delay
async def main():
task = asyncio.create_task(delay(10))
try:
result = await asyncio.wait_for(asyncio.shield(task), 5)
print(result)
except asyncio.exceptions.TimeoutError:
print("Task took longer than five seconds, it will finish soon!")
result = await task
print(result)
asyncio.run(main())
# => Starting <function delay at 0x7ff344d120e0> with (10,) {}
# => sleeping for 10 second(s)
# => Task took longer than five seconds, it will finish soon!
# => finished sleeping for 10 second(s)
# => finished <function delay at 0x7ff344d120e0> in 10.0063 second(s)
# => 10