asyncio异步 I/O

2020-06-19  本文已影响0人  wit92

asyncio 是用来编写 并发 代码的库,使用 async/await 语法。

asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

asyncio 提供一组 高层级 API 用于:

此外,还有一些 低层级 API 以支持 库和框架的开发者 实现:

协程

协程通过 async/await 语法进行声明,是编写异步应用的推荐方式。例如,以下代码段 (需要 Python 3.7+) 打印 "hello",等待 1 秒,然后打印 "world":

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

注意:简单地调用一个协程并不会将其加入执行日程:

>>> main()
<coroutine object main at 0x1053bb7c8>

要真正运行一个协程,asyncio 提供了三种主要机制:

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

预期的输出:

started at 17:13:52
hello
world
finished at 17:13:55

asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程。

让我们修改以上示例,并发 运行两个 say_after 协程:

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

注意,预期的输出显示代码段的运行时间比之前快了 1 秒:

可等待对象

如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。

可等待 对象有三种主要类型: 协程, 任务Future.

协程

Python 协程属于 可等待 对象,因此可以在其他协程中被等待:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

重要

在本文档中 "协程" 可用来表示两个紧密关联的概念:

asyncio 也支持旧式的 基于生成器的 协程.

任务

任务 被用来设置日程以便 并发 执行协程。

当一个协程通过 asyncio.create_task() 等函数被打包为一个 任务,该协程将自动排入日程准备立即运行

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Future 对象

Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果

当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。

在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。

通常情况下 没有必要 在应用层级的代码中创建 Future 对象。

Future 对象有时会由库和某些 asyncio API 暴露给用户,用作可等待对象:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

一个很好的返回对象的低层级函数的示例是 loop.run_in_executor()

运行 asyncio 程序

创建任务

asyncio.``create_task(coro)

coro 协程 打包为一个 Task 排入日程准备执行。返回 Task 对象。

该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError

此函数 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future()函数。

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

休眠

coroutine asyncio.``sleep(delay, result=None, ***, loop=None)

阻塞 delay 指定的秒数。

如果指定了 result,则当协程完成时将其返回给调用者。

sleep() 总是会挂起当前任务,以允许其他任务运行。

loop 参数已弃用,计划在 Python 3.10 中移除。

以下协程示例运行 5 秒,每秒显示一次当前日期:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

并发运行任务

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

在 3.7 版更改: 如果 gather 本身被取消,则无论 return_exceptions 取值为何,消息都会被传播。

屏蔽取消操作

awaitable asyncio.``shield(aw, ***, loop=None)

保护一个 可等待对象 防止其被 取消

如果 aw 是一个协程,它将自动作为任务加入日程。

以下语句:

res = await shield(something())

相当于:

res = await something()

不同之处 在于如果包含它的协程被取消,在 something() 中运行的任务不会被取消。从 something() 的角度看来,取消操作并没有发生。然而其调用者已被取消,因此 "await" 表达式仍然会引发 CancelledError

如果通过其他方式取消 something() (例如在其内部操作) 则 shield() 也会取消。

如果希望完全忽略取消操作 (不推荐) 则 shield() 函数需要配合一个 try/except 代码段,如下所示:

try:
    res = await shield(something())
except CancelledError:
    res = None

超时

coroutine asyncio.``wait_for(aw, timeout, ***, loop=None)

等待 aw 可等待对象 完成,指定 timeout 秒数后超时。

如果 aw 是一个协程,它将自动作为任务加入日程。

timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeoutNone,则等待直到完成。

如果发生超时,任务将取消并引发 asyncio.TimeoutError.

要避免任务 取消,可以加上 shield()

函数将等待直到目标对象确实被取消,所以总等待时间可能超过 timeout 指定的秒数。

如果等待被取消,则 aw 指定的对象也会被取消。

loop 参数已弃用,计划在 Python 3.10 中移除。

示例:

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

在 3.7 版更改:aw 因超时被取消,wait_for 会等待 aw 被取消。之前版本则将立即引发 asyncio.TimeoutError

简单等待

coroutine asyncio.``wait(aws, ***, loop=None, timeout=None, return_when=ALL_COMPLETED)

并发运行 aws 指定的 可等待对象 并阻塞线程直到满足 return_when 指定的条件。

如果 aws 中的某个可等待对象为协程,它将自动作为任务加入日程。直接向 wait() 传入协程对象已弃用,因为这会导致 令人迷惑的行为

返回两个 Task/Future 集合: (done, pending)

用法:

done, pending = await asyncio.wait(aws)

loop 参数已弃用,计划在 Python 3.10 中移除。

如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。

请注意此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 或 Task 将在指定秒数后被返回。

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常数 描述
FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED
ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。

wait_for() 不同,wait() 在超时发生时不会取消可等待对象。

注解

wait() 会自动将协程作为任务加入日程,以后将以 (done, pending) 集合形式返回显式创建的任务对象。因此以下代码并不会有预期的行为:

注解

async def foo():
    return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
    # This branch will never be run!

以上代码段的修正方法如下:

async def foo():
    return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
    # Everything will work as expected now.

asyncio.``as_completed(aws, ***, loop=None, timeout=None)

并发地运行 aws 集合中的 可等待对象。返回一个 Future 对象的迭代器。返回的每个 Future 对象代表来自剩余可等待对象集合的最早结果。

如果在所有 Future 对象完成前发生超时则将引发 asyncio.TimeoutError

示例:

for f in as_completed(aws):
    earliest_result = await f
    # ...

来自其他线程的日程安排

asyncio.``run_coroutine_threadsafe(coro, loop)

向指定事件循环提交一个协程。线程安全。

返回一个 concurrent.futures.Future 以等待来自其他 OS 线程的结果。

此函数应该从另一个 OS 线程中调用,而非事件循环运行所在线程。示例:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果在协程内产生了异常,将会通知返回的 Future 对象。它也可被用来取消事件循环中的任务:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

查看 并发和多线程 章节的文档。

不同与其他 asyncio 函数,此函数要求显式地传入 loop 参数。

3.5.1 新版功能.

内省

Task 对象

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now

基于生成器的协程

注解: 对基于生成器的协程的支持 已弃用 并计划在 Python 3.10 中移除。

基于生成器的协程是 async/await 语法的前身。它们是使用 yield from 语句创建的 Python 生成器,可以等待 Future 和其他协程。

基于生成器的协程应该使用 @asyncio.coroutine 装饰,虽然这并非强制。

上一篇 下一篇

猜你喜欢

热点阅读