使用 asyncio 包处理并发
2018-03-20 本文已影响108人
一块大番薯
并发与并行(Concurrency、Parallelism)
- 真正的并行需要多个核心
- 例如 4 个核心 CPU,最多只能 4 并行
如果每个核心运行 3 个线程,则 4 并行,12 并发,12 线程 - 并发更有用
asyncio 包
- 使用协程(事件循环驱动)实现并发
- 巧妙的回调:
yield from ... 变成 yield From(...)、return result 变成 raise Return(result)
使用线程实现并发(threading 模块)
Python 没有提供终止线程的 API,所以要关闭线程,必须给线程发送信息
此处利用 signal.go 属性,从外部控制线程。
import sys
import time
import threading
import itertools
class Signal:
go = True
def spin(msg, signal):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status)) # 使用退格符(\x08)把光标移回来
time.sleep(.1)
if not signal.go:
break
write(' ' * len(status) + '\x08' * len(status))
def slow_func():
time.sleep(3)
return 42
def supervisor():
signal = Signal()
spinner = threading.Thread(target=spin,
args=('thinking!', signal))
print('spinner object:', spinner)
spinner.start()
result = slow_func()
signal.go = False
spinner.join() # 等待 spinner 线程结束
return result
def main():
result = supervisor()
print('Answer:', result)
if __name__ == '__main__':
main()
使用协程实现并发(asyncio 模块)
-
严格的协程定义:
asyncio API 的协程在定义体中必须使用 yield from,而不能使用 yield -
适合 asyncio 的协程要由调用方通过 yield from 调用
或者把协程传给 asyncio 包中某个函数 -
@asyncio.coroutine 装饰器应该应用在协程上。其一凸显协程,其二有助调试(即如果协程没有产生值,则被垃圾回收,发出warning)。但不会预激协程。
-
yield from 把控制权交给事件循环。而类似 time.sleep(.1) 反而阻塞事件循环
import sys
import asyncio
import itertools
@asyncio.coroutine
def spin(msg):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
try:
yield from asyncio.sleep(.1) # 协程 asyncio.sleep 通过 yield from 调用
except asyncio.CancelledError:
break
write(' ' * len(status) + '\x08' * len(status))
@asyncio.coroutine
def slow_func():
yield from asyncio.sleep(3) # 把控制权交给主循环
return 42
@asyncio.coroutine
def supervisor():
spinner = asyncio.async(spin('thinking!')) # 协程 spin 通过 asyncio.async 调用
print('spinner object:', spinner) # Task 对象,类似 Thread 对象
result = yield from slow_func() # 协程 slow_func 通过 yield from 调用
spinner.cancel()
return result
def main():
loop = asyncio.get_event_loop()
result = loop.run_until_complete(supervisor()) # 驱动协程
loop.close()
print('Answer:', result)
if __name__ == '__main__':
main()
对比线程和协程:
def supervisor():
signal = Signal()
spinner = threading.Thread(target=spin,
args=('thinking!', signal))
print('spinner object:', spinner)
spinner.start()
result = slow_func()
signal.go = False
spinner.join()
return result
@asyncio.coroutine
def supervisor():
spinner = asyncio.async(spin('thinking!'))
print('spinner object:', spinner)
result = yield from slow_func()
spinner.cancel()
return result
- asyncio.Task 和 threading.Thread 等效
前者驱动协程 spin,后者调用可调用对象 spin - Task 对象可由 asyncio.async(coroutine) 或 loop.create_task(...) 创建
- Task 对象创建时自动排定运行时间,而 Thread 对象需调用 start 方法
- 线程终止靠外部控制
任务终止可使用 Task.cancel 方法,在协程内部抛出 CanceledError - 联系:阻塞的操作通过协程来实现。而协程会把控制权交给主循环。
如 time.sleep(3) 替换成 asyncio.sleep(3)
又如 requests.get(url) 替换成 aiohttp.request('GET', url)