python程序员python热爱者

Python3 异步协程函数async具体用法

2017-11-09  本文已影响5168人  予岁月以文明

之前使用Python的人往往纠缠在多线程,多进程,评判哪个效率更高?
其实,相对于别家的协程和异步,不管多线程还是多进程效率都要被吊打,多线程之间切换耗费cpu寄存器资源,OS 调度的不太可控,多进程间通信不便的问题。
后来Python改进了语法,引入了yiled from充当协程调度,后来有人根据这个新特性开发了第三方协程框架,Tornado,Gevent等。
在这场效率之争里,Python这么受欢迎的语言,官方怎么能默不出声?所以Python之父深入简出3年,苦心钻研自家的协程,async/await和asyncio库,并放到Python3.5后成为远程原生的协程,
对于类似爬虫这种延时的IO操作,协程是个大利器,优点很多,他可以在一个阻塞发生时,挂起当前程序,跑去执行其他程序,把事件注册到循环中,实现多程序并发,据说超越了10k限制,不过我没有试验过极限。
现在讲一讲协程的简单的用法,当你爬一个网站,有100个网页,正常是请求一次,回来一次

for url in urls:
  response=get(url)
  results=parse(response)

这样效率很低,但协程可以一次发起100个请求(其实也是一个一个发),不同的是协程不会死等返回,而是发一个请求,挂起,再发一个再挂起,发起100个,挂起100个,然后同时等待100个返回,效率提升了100倍。可以理解为同时做100件事,相对于多线程,做到了由自己调度而不是交给CPU,程序流程可控,节约资源,效率极大提升。

async def get(url):#定义协程抓取函数,这里用了aiohttp库
  
    async with aiohttp.ClientSession() as session:#协程上下文
        async with session.get(url) as response:
            return await response.text()
#await 是挂起命令,挂起当前,执行response.text(),response.text()执行完成后重新激活当前函数继续运行,返回。

如果response.text迟迟不回,程序不会死等,而是去你定义的任务循环中寻找另一个任务(如果有的话),如果没有循环任务,那就死等咯。。。毕竟总要有等返回结果的。
所以实现协程就是要实现多个任务的循环。用一张简单的图表示。


timg.jpg

也就是说任务一直跑,每到一个地方awit一次,然后await返回,直到最终全部返回,主程序结束。

调用协程
协程不能直接运行,需要把协程加入到事件循环(loop)。asyncio.get_event_loop方法可以创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。

import time
import asyncio

now = lambda : time.time()
async def do_some_work(x):
    print('Waiting: ', x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('TIME: ', now() - start)

结果

Waiting:  2
TIME:  0.00016564312531

关于task

协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类。保存了协程运行后的状态,用于未来获取协程的结果。

import asyncio
import time

now = lambda : time.time()

async def do_some_work(x):
    print('Waiting: ', x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(coroutine)
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print('TIME: ', now() - start)

输出结果为:

<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:17>>
Waiting:  2
<Task finished coro=<do_some_work() done, defined at /Users/ghost/Rsj217/python3.6/async/async-main.py:17> result=None>
TIME:  0.0003490447998046875

创建task后,task在加入事件循环之前是pending状态,当loop事件开始循环,所有pending状态的task都开始执行到await那一步(函数体内并非如此),不管loop里是否调用

asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task,run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成task,task是Future的子类。isinstance(task, asyncio.Future)将会输出True。

就是这么个道理,协成里面也可以放yield 表达式,变成生成器式协程,可以使用 async for x in 生成式协程。比如

async def duo():
    print('多多')
    await asyncio.sleep(4)
    eiei=yield 666
    print(eiei)

要想取到duo函数弹出的666,需要用

async for i in duo():
    i+=100
    print(i)

再举一个例子,典型的消费,生产者模型,说是买土豆,实际是拿字典,来源知乎,我做了修改。

from time import sleep
from random import randint,random
import asyncio
all_potatos={x:randint(1,50) for x  in "abcd"}

async def take_potatos(num):
    count = 0
    while True:
        if len(all_potatos) == 0:
            await ask_for_potato()
        else:
            potato = all_potatos.popitem()
            yield potato
            count += 1
            if count == num:
                break

async def buy_potatos():
    bucket = []
    async for p in take_potatos(20):
        bucket.append(p)
        print(bucket)

async def ask_for_potato():
    await asyncio.sleep(3)
    all_potatos.update({x:randint(1,20) for x  in "临兵斗者皆阵列在前"})

def main():
    loop=asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([buy_potatos()])  )
    loop.close()

if __name__ == '__main__':
    main()

在实际生产环境中,最常见的依然是生产-消费模型,用队列串联,在任务开始就初始化多个worker,用join方法挂起任务,让worker无限循环,get队列里的值。直到队列为空。以下这个例子。

import time
import asyncio
from asyncio import Queue


def now(): return time.time()


async def worker(q):#工作者消费队列
    print('Start worker')

    while 1:#无限循环
        start = now()
        task = await q.get()#开始消费
        if not task:
            await asyncio.sleep(1)
            continue
        print('working on ', int(task))
        await asyncio.sleep(int(task))
        q.task_done()#队列通知
        print('Job Done for ', task, now() - start)


async def generate_run(q):#生成worker线程函数
    asyncio.ensure_future(worker(q))
    asyncio.ensure_future(worker(q))#先弄了两个worker去跑
    await q.join()主线程挂起等待队列完成通知
    jobs = asyncio.Task.all_tasks()完成后收集所有线程,这里是3个,算上自己
    print('是否已经关闭任务', asyncio.gather(*jobs).cancel())#关闭线程方法,返回True


def main():

    loop = asyncio.get_event_loop()
    q = Queue()
    for i in range(3):
        q.put_nowait(str(i))#一定要放入字符,数字0是空,队列一直不会结束。
    loop.run_until_complete(generate_run(q))#启动生成函数

    loop.close()


if __name__ == '__main__':
    main()


上一篇 下一篇

猜你喜欢

热点阅读