Python3 异步协程函数async具体用法
之前使用Python的人往往纠缠在多线程,多进程,评判哪个效率更高?
其实,相对于别家的协程和异步,不管多线程还是多进程效率都要被吊打,多线程之间切换耗费cpu寄存器资源,OS 调度的不太可控,多进程间通信不便的问题。
后来Python改进了语法,引入了yiled from充当协程调度,后来有人根据这个新特性开发了第三方协程框架,Tornado,Gevent等。
在这场效率之争里,Python这么受欢迎的语言,官方怎么能默不出声?所以Python之父深入简出3年,苦心钻研自家的协程,async/await和asyncio库,并放到Python3.5后成为远程原生的协程,
对于类似爬虫这种延时的IO操作,协程是个大利器,优点很多,他可以在一个阻塞发生时,挂起当前程序,跑去执行其他程序,把事件注册到循环中,实现多程序并发,据说超越了10k限制,不过我没有试验过极限。
现在讲一讲协程的简单的用法,当你爬一个网站,有100个网页,正常是请求一次,回来一次
for url in urls:
response=get(url)
results=parse(response)
这样效率很低,但协程可以一次发起100个请求(其实也是一个一个发),不同的是协程不会死等返回,而是发一个请求,挂起,再发一个再挂起,发起100个,挂起100个,然后同时等待100个返回,效率提升了100倍。可以理解为同时做100件事,相对于多线程,做到了由自己调度而不是交给CPU,程序流程可控,节约资源,效率极大提升。
async def get(url):#定义协程抓取函数,这里用了aiohttp库
async with aiohttp.ClientSession() as session:#协程上下文
async with session.get(url) as response:
return await response.text()
#await 是挂起命令,挂起当前,执行response.text(),response.text()执行完成后重新激活当前函数继续运行,返回。
如果response.text迟迟不回,程序不会死等,而是去你定义的任务循环中寻找另一个任务(如果有的话),如果没有循环任务,那就死等咯。。。毕竟总要有等返回结果的。
所以实现协程就是要实现多个任务的循环。用一张简单的图表示。
![](https://img.haomeiwen.com/i2596401/541e01f6017d0789.jpg)
也就是说任务一直跑,每到一个地方awit一次,然后await返回,直到最终全部返回,主程序结束。
调用协程
协程不能直接运行,需要把协程加入到事件循环(loop)。asyncio.get_event_loop方法可以创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('TIME: ', now() - start)
结果
Waiting: 2
TIME: 0.00016564312531
关于task
协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类。保存了协程运行后的状态,用于未来获取协程的结果。
import asyncio
import time
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(coroutine)
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print('TIME: ', now() - start)
输出结果为:
<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:17>>
Waiting: 2
<Task finished coro=<do_some_work() done, defined at /Users/ghost/Rsj217/python3.6/async/async-main.py:17> result=None>
TIME: 0.0003490447998046875
创建task后,task在加入事件循环之前是pending状态,当loop事件开始循环,所有pending状态的task都开始执行到await那一步(函数体内并非如此),不管loop里是否调用
asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task,run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成task,task是Future的子类。isinstance(task, asyncio.Future)将会输出True。
就是这么个道理,协成里面也可以放yield 表达式,变成生成器式协程,可以使用 async for x in 生成式协程。比如
async def duo():
print('多多')
await asyncio.sleep(4)
eiei=yield 666
print(eiei)
要想取到duo函数弹出的666,需要用
async for i in duo():
i+=100
print(i)
再举一个例子,典型的消费,生产者模型,说是买土豆,实际是拿字典,来源知乎,我做了修改。
from time import sleep
from random import randint,random
import asyncio
all_potatos={x:randint(1,50) for x in "abcd"}
async def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
await ask_for_potato()
else:
potato = all_potatos.popitem()
yield potato
count += 1
if count == num:
break
async def buy_potatos():
bucket = []
async for p in take_potatos(20):
bucket.append(p)
print(bucket)
async def ask_for_potato():
await asyncio.sleep(3)
all_potatos.update({x:randint(1,20) for x in "临兵斗者皆阵列在前"})
def main():
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([buy_potatos()]) )
loop.close()
if __name__ == '__main__':
main()
在实际生产环境中,最常见的依然是生产-消费模型,用队列串联,在任务开始就初始化多个worker,用join方法挂起任务,让worker无限循环,get队列里的值。直到队列为空。以下这个例子。
import time
import asyncio
from asyncio import Queue
def now(): return time.time()
async def worker(q):#工作者消费队列
print('Start worker')
while 1:#无限循环
start = now()
task = await q.get()#开始消费
if not task:
await asyncio.sleep(1)
continue
print('working on ', int(task))
await asyncio.sleep(int(task))
q.task_done()#队列通知
print('Job Done for ', task, now() - start)
async def generate_run(q):#生成worker线程函数
asyncio.ensure_future(worker(q))
asyncio.ensure_future(worker(q))#先弄了两个worker去跑
await q.join()主线程挂起等待队列完成通知
jobs = asyncio.Task.all_tasks()完成后收集所有线程,这里是3个,算上自己
print('是否已经关闭任务', asyncio.gather(*jobs).cancel())#关闭线程方法,返回True
def main():
loop = asyncio.get_event_loop()
q = Queue()
for i in range(3):
q.put_nowait(str(i))#一定要放入字符,数字0是空,队列一直不会结束。
loop.run_until_complete(generate_run(q))#启动生成函数
loop.close()
if __name__ == '__main__':
main()