python中的asyncio

2019-05-28  本文已影响0人  KayFelicities

概念

协程coroutine

指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
协程 是为非抢占式多任务产生子程序的计算机程序组件,协程允许不同入口点在不同位置暂停或开始执行程序”。从技术的角度来说,“协程就是你可以暂停执行的函数”。如果你把它理解成“就像生成器一样”,那么你就想对了。
人们不应该将async/await等同于asyncio,而应该将asyncio看作是一个利用async/await API 进行异步编程的框架。

事件循环event_loop

程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
事件循环提供一种循环机制,让你可以“在A发生时,执行B”。基本上来说事件循环就是监听当有什么发生时,同时事件循环也关心这件事并执行相应的代码。Python 3.4 以后通过标准库 asyncio 获得了事件循环的特性。

任务task

一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。

future

代表将来执行或没有执行的任务的结果。它和task上没有本质的区别。

async/await

python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

线程threding、进程multiprocessing、协程asyncio

Python因为有GIL(全局解释锁)这玩意,不可能有真正的多线程的存在,因此很多情况下都会用multiprocessing实现并发,而且在Python中应用多线程还要注意关键地方的同步,不太方便,用协程代替多线程和多进程是一个很好的选择,因为它吸引人的特性:主动调用/退出,状态保存,避免cpu上下文切换等。
多进程和多线程除了创建的开销大之外还有一个难以根治的缺陷,就是处理进程之间或线程之间的协作问题,因为是依赖多进程和多线程的程序在不加锁的情况下通常是不可控的,而协程则可以完美地解决协作问题,由用户来决定协程之间的调度。

写法

async

调用协程函数必须用await

await

await必须在协程函数中使用

协程并发

import asyncio

async def a():  # 定义一个协程,比普通的函数多了async关键字
    print('Suspending a')
    await asyncio.sleep(0)  # 在协程中挂起(释放控制权),await 后面接的方法必须是awaitable的
    print('Resuming a')

async def b():
    print('In b')

async def main():
    await asyncio.gather(a(), b())    # 并发运行任务,另一种写法:asyncio.wait([a(), b()], )

if __name__ == '__main__':
    """ python3.6:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    """
    asyncio.run(main())  # 启动事件循环,python3.7新写法

输出:

Suspending a
In b
Resuming a

asyncio.gather 和 asyncio.wait

# 返回值:协程的返回值list
return_value_a, return_value_b, return_value_c = await asyncio.gather(a(), b(), c()) 

# 返回值:完成的协程和未完成的协程
done, pending = await asyncio.wait([a(), b(), c()], return_when=asyncio.tasks.FIRST_COMPLETED)

任务

import asyncio
import time

async def a(): 
    print('Suspending a')
    await asyncio.sleep(1)
    print('Resuming a')

async def b():
    print('In b')
    await asyncio.sleep(2)
    print('out b')

async def main():
    taska = asyncio.create_task(a()) #python 3.6使用asyncio.ensure_future(a())
    taskb = asyncio.create_task(b()) #任务一旦建立即开始运行,不信你在上面一行加一个延时
    #此时任务已经在运行

    await taska #这里只是等待任务结束,并不是启动任务
    await taskb #甚至可以用await asyncio.sleep(2)来等待任务结束

if __name__ == '__main__':
    """ python3.6:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    """
    start_tm = time.time()
    asyncio.run(main())  # 启动事件循环,python3.7新写法
    print(f"tm use: {time.time() - start_tm: .1f}")

输出:

Suspending a
In b
Resuming a
out b
tm use:  2.0

例子

  1. 来源:https://stackoverflow.com/questions/54787401/how-to-properly-use-asyncio-first-completed
import asyncio
import random
async def grab_proxy(taskid):
    await asyncio.sleep(random.uniform(0.1, 1))
    result = random.choice([None, None, None, 'result'])
    print(f'Task #{taskid} producing result {result!r}')
    return result

async def task_manager():
    tasks = [grab_proxy(i) for i in range(10)]
    while tasks:
        finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        for x in finished:
            result = x.result()
            print(f"Finished task produced {result!r}")
            if result:
                # cancel the other tasks, we have a result. We need to wait for the cancellations
                # to propagate.
                print(f"Cancelling {len(unfinished)} remaining tasks")
                for task in unfinished:
                    task.cancel()
                if unfinished:  # stackoverflow原答案里没有这个条件,如果unfinished为空,下面的wait会抛出异常
                    await asyncio.wait(unfinished)
                return result
        tasks = unfinished

def get_proxy_loop():
    loop = asyncio.new_event_loop()
    proxy = loop.run_until_complete(task_manager())
    loop.close()
    return proxy

get_proxy_loop()

2.来源:https://stackoverflow.com/questions/52582685/using-asyncio-queue-for-producer-consumer-flow

import asyncio, random, time

async def rnd_sleep(t):
    # sleep for T seconds on average
    await asyncio.sleep(t * random.random() * 2)

async def producer(queue):
    while True:
        token = random.random()
        print(f'produced {token}')
        if token < .05:
            break
        await queue.put(token)
        await rnd_sleep(.1)

async def consumer(queue):
    while True:
        token = await queue.get()
        await rnd_sleep(.3)
        queue.task_done()
        print(f'consumed {token}')

async def main():
    queue = asyncio.Queue()

    # fire up the both producers and consumers
    producers = [asyncio.create_task(producer(queue))
                 for _ in range(3)]
    consumers = [asyncio.create_task(consumer(queue))
                 for _ in range(10)]

    # with both producers and consumers running, wait for
    # the producers to finish
    await asyncio.gather(*producers)
    print('---- done producing')

    # wait for the remaining tasks to be processed
    await queue.join()

    # cancel the consumers, which are now idle
    for c in consumers:
        c.cancel()

asyncio.run(main())

参考

http://blog.rainy.im/2016/03/09/how-the-heck-does-async-await-work-in-python-3-5/

上一篇 下一篇

猜你喜欢

热点阅读