快乐的 asyncio 学习
2018-02-10 本文已影响80人
宝宝家的隔壁老王
其实 asyncio 的学习一点也不快乐
一、python 的多线程和多进程
要想理解 asyncio 的异步编程,需要简单了解一下 python 的多线程和多进程知识
-
1、多线程
python 有 GIL 机制,因此,python 的多线程操作并非真正意义的多线程,而仅仅在线程处于睡眠或者等待 I/O 时,才会发挥真正的多线程功能。
-
1.1、睡眠
- time.sleep()
- threading.Lock
- 线程模块其他同步对象
-
1.2、I/O
- requests
- open
-
1.3、定期放弃GIL
- py2 解释器每执行 1000 字节码释放 GIL
- py3 解释器每执行 15ms 释放 GIL
-
1.4、GIL 全局解释器锁
- 一个线程运行 Python,而其他 N 个睡眠或者等待 I/O。(保证同一时刻只有一个线程对共享资源进行存取)
-
1.5、GIL 原理
/* s.connect((host, port)) method */ static PyObject * sock_connect(PySocketSockObject *s, PyObject *addro) { sock_addr_t addrbuf; int addrlen; int res; /* convert (host, port) tuple to C address */ getsockaddrarg(s, addro, SAS2SA(&addrbuf), &addrlen); Py_BEGIN_ALLOW_THREADS res = connect(s->sock_fd, addr, addrlen); Py_END_ALLOW_THREADS /* error handling and so on .... */ }
- Py_BEGIN_ALLOW_THREADS 放弃 GIL
- Py_END_ALLOW_THREADS 重新获取 GIL,一个线程会在这个位置阻塞,等待另一个线程释放锁;一旦出现这个情况,等待的线程会抢夺回锁,并恢复python代码的执行
- 简而言之:当N个线程在网络 I/O 堵塞,或等待重新获取GIL,而一个线程运行Python
-
1.6、示例
- 睡眠阻塞
import time from threading import Thread from datetime import datetime def write(i): print('{} start write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i)) time.sleep(4) print('{} end write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i)) def fun(): print('start ...') for i in range(3): Thread(target=write, args=(i,), daemon=False).start() print('end ...') # 输出结果 start ... 2018-02-09 23:58:25 start write --> 0 2018-02-09 23:58:25 start write --> 1 2018-02-09 23:58:25 start write --> 2 end ... 2018-02-09 23:58:29 end write --> 0 2018-02-09 23:58:29 end write --> 1 2018-02-09 23:58:29 end write --> 2
- CPU 阻塞
import time from threading import Thread from datetime import datetime def write(n): print('{} start write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), n)) l, sum_ = list(range(100000000)), 0 for i in l: sum_ += i print('{} end write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), n)) def fun(): print('start ...') for i in range(3): Thread(target=write, args=(i,), daemon=False).start() print('end ...') # 输出结果 start ... 2018-02-10 00:13:55 start write --> 0 2018-02-10 00:13:58 start write --> 1 2018-02-10 00:14:02 start write --> 2 end ... 2018-02-10 00:14:27 end write --> 0 2018-02-10 00:14:32 end write --> 1 2018-02-10 00:14:35 end write --> 2
- 总结
- 对于睡眠操作或者 I/O 操作,多线程的作用非常明显,明显减少所消耗总时间;
- 对于 CPU 计算型操作,多线程操作反而因为多线程间获取 GIL 而增加总的消耗时间。
-
2、python 多进程
python多进程即其他语言中的多进程概念,不再累述
二、异步编程思想
- 1、协程(coroutine)
- 2、任务(Task)
- 3、事件循环(loop)
1、Task 对象包含 协程(coro) 和协程调用时间2个属性;
2、Loop 对象使用堆承载多个 Task 对象,根据 Task 对象中最小调用时间去执行对应的 coro。如果 coro 没有迭代完,则将此 coro 生成新的 task,然后 push 到 Loop 对象的堆中。
# 简单的调用示例
import asyncio
@asyncio.coroutine
def coro_fun():
yield from range(10)
loop = asyncio.get_event_loop()
loop.run_until_complete(coro_fun())
# or
tasks = [asyncio.ensure_future(coro_fun())]
loop.run_until_complete(asyncio.wait(tasks))
三、源代码分析
- 关于 _ code _.co_flags
# 每个函数或方法都有 __code__ 魔法方法 以及其对应的 co_flags 值 # 在 Cpython 中, 1、生成器函数的标识符为 CO_GENERATOR 即 0x20, 2、协程函数的标识符为 CO_COROUTINE 即 0x180 3、CO_ITERABLE_COROUTINE 即 0x100 # 通过对函数对象的 __code__.co_flags 与 对应的标识符做位与运算,如果是真值,则表明函数对象属于生成器函数或协程函数 def gen_fun(): yield from range(10) >>> gen_fun.__code__.co_flags # 99 >>> 99 & 0x20 # 32, True >>> 99 & 0x180 # 0, False async def asy_fun(): await sleep(4) >>> asy_fun.__code__.co_flags # 227 >>> 99 & 0x20 # 32, True >>> 99 & 0x180 # 128, True
- 关于类型判断
from collections import Iterator, Awaitable # 判断迭代器 和 Awaitable 对象 class A: def __iter__(self): return iter([1,2,3,4,5]) def __await__(self): return iter([1,2,3,4,5]) a = A() >>> isinstance(a, Iterator) # True >>> isinstance(a, Awaitable) # True # 判断是否为协程等 import inspect async def asy_fun(): await a >>> inspect.iscoroutine(asy_fun()) # True
- @asyncio.coroutine
def coroutine(func): # 将一个生成器标记为协程,如果在destroyed前没有调用,则会记录错误 # 这个方法是使用 inspect.iscoroutinefunction 方法判断是否为协程方法,使用 types.coroutine 装饰的生成器,或 async def 语法定义的函数都会返回 True if _inspect_iscoroutinefunction(func): return func # 使用 co_flags 判断是否为生成器 if inspect.isgeneratorfunction(func): coro = func else: @functools.wraps(func) def coro(*args, **kw): res = func(*args, **kw) # 判断 res 是否为期物,生成器 或 协程包装类 实例 if isinstance(res, futures.Future) or inspect.isgenerator(res) or \ isinstance(res, CoroWrapper): res = yield from res elif _AwaitableABC is not None: # py 3.5 才会有 Awaitable 类 try: # 如果有 __await__属性,__await__属性只会返回一个不是协程的迭代器 await_meth = res.__await__ except AttributeError: pass else: # 如果是 Awaitable 对象 if isinstance(res, _AwaitableABC): # 使用 yield from 处理其迭代器 res = yield from await_meth() return res # 使用 types.coroutine 包装 coro(注意,多层 @types.coroutine 装饰不会影响,会直接return装饰的值) if not _DEBUG: if _types_coroutine is None: wrapper = coro else: wrapper = _types_coroutine(coro) else: @functools.wraps(func) def wrapper(*args, **kwds): # 使用协程包装器处理 w = CoroWrapper(coro(*args, **kwds), func=func) if w._source_traceback: del w._source_traceback[-1] # 如果是 py 3.5 则包装增加 协程 对象的属性,否则包装为 生成器 对象的属性 w.__name__ = getattr(func, '__name__', None) w.__qualname__ = getattr(func, '__qualname__', None) return w # 用以别处使用 asyncio.iscoroutinefunction() 判断为 True 的作用 wrapper._is_coroutine = True # For iscoroutinefunction(). return wrapper
- @types.coroutine
def coroutine(func): # 将一个普通的生成器函数转化为协程 if not callable(func): raise TypeError('types.coroutine() expects a callable') if (func.__class__ is FunctionType and getattr(func, '__code__', None).__class__ is CodeType): # 获取函数的 co_flags co_flags = func.__code__.co_flags # 检查是否为协程函数 if co_flags & 0x180: return func # 检查是否为生成器函数,此步主要作用是将生成器的 co_flags 同 0x100 做位或运算,将其标识变更为协程标识 if co_flags & 0x20: # TODO: Implement this in C. co = func.__code__ func.__code__ = CodeType( co.co_argcount, co.co_kwonlyargcount, co.co_nlocals, co.co_stacksize, co.co_flags | 0x100, # 0x100 == CO_ITERABLE_COROUTINE co.co_code, co.co_consts, co.co_names, co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno, co.co_lnotab, co.co_freevars, co.co_cellvars) return func # 用以支持类似生成器的对象 @_functools.wraps(func) def wrapped(*args, **kwargs): coro = func(*args, **kwargs) # 协程或 co_flags 大于 256 的生成器对象,直接返回 if (coro.__class__ is CoroutineType or coro.__class__ is GeneratorType and coro.gi_code.co_flags & 0x100): return coro if (isinstance(coro, _collections_abc.Generator) and not isinstance(coro, _collections_abc.Coroutine)): # 实现了生成器抽象类的方法,使用生成器包装器处理成生成器 return _GeneratorWrapper(coro) # 协程抽象类实例或其他对象 return coro return wrapped
- asyncio.get_event_loop
- 1、DefaultEventLoopPolicy().get_event_loop()
# DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): # 事件循环且监听子进程 _loop_factory = _UnixSelectorEventLoop def __init__(self): super().__init__() self._watcher = None def _init_watcher(self): with events._lock: if self._watcher is None: # pragma: no branch self._watcher = SafeChildWatcher() if isinstance(threading.current_thread(), threading._MainThread): self._watcher.attach_loop(self._local._loop) def set_event_loop(self, loop): # 如果子监听已经设置,那么在主线程中调用 .set_event_loop() 会在子监听中调用 .attach_loop(loop) super().set_event_loop(loop) if self._watcher is not None and \ isinstance(threading.current_thread(), threading._MainThread): self._watcher.attach_loop(loop) def get_child_watcher(self): """ Get the watcher for child processes. If not yet set, a SafeChildWatcher object is automatically created. """ if self._watcher is None: self._init_watcher() return self._watcher def set_child_watcher(self, watcher): """Set the watcher for child processes.""" assert watcher is None or isinstance(watcher, AbstractChildWatcher) if self._watcher is not None: self._watcher.close() self._watcher = watcher
- 2、父类BaseDefaultEventLoopPolicy().get_event_loop()
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): # 此 policy 下,每一个线程拥有独立的事件循环,然而我们仅会默认在主线程创建一个事件循环,其他线程默认没有事件循环 # 其他 policy 下有不同的规则(e.g. 一个全局的事件循环,或者每一个线程自动创建一个事件循环,或者使用其他上下文将事件循环关联起来) _loop_factory = None class _Local(threading.local): _loop = None _set_called = False def __init__(self): self._local = self._Local() def get_event_loop(self): # 如果主线程 且 self._local._loop == None 且 self._local._set_called == False if (self._local._loop is None and not self._local._set_called and isinstance(threading.current_thread(), threading._MainThread)): self.set_event_loop(self.new_event_loop()) if self._local._loop is None: raise RuntimeError('There is no current event loop in thread %r.' % threading.current_thread().name) return self._local._loop def set_event_loop(self, loop): # 设置事件循环 self._local._set_called = True assert loop is None or isinstance(loop, AbstractEventLoop) self._local._loop = loop def new_event_loop(self): # 调用 set_event_loop() 设置,因为还牵扯到 _local._set_called 的设置 return self._loop_factory()
- 3、_UnixSelectorEventLoop 事件循环工厂类
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): """Unix event loop. Adds signal handling and UNIX Domain Socket support to SelectorEventLoop. """ def __init__(self, selector=None): super().__init__(selector) self._signal_handlers = {} def _socketpair(self): return socket.socketpair() def close(self): super().close() for sig in list(self._signal_handlers): self.remove_signal_handler(sig) def _process_self_data(self, data): for signum in data: if not signum: # ignore null bytes written by _write_to_self() continue self._handle_signal(signum) def add_signal_handler(self, sig, callback, *args): """Add a handler for a signal. UNIX only. Raise ValueError if the signal number is invalid or uncatchable. Raise RuntimeError if there is a problem setting up the handler. """ if (coroutines.iscoroutine(callback) or coroutines.iscoroutinefunction(callback)): raise TypeError("coroutines cannot be used " "with add_signal_handler()") self._check_signal(sig) self._check_closed() try: # set_wakeup_fd() raises ValueError if this is not the # main thread. By calling it early we ensure that an # event loop running in another thread cannot add a signal # handler. signal.set_wakeup_fd(self._csock.fileno()) except (ValueError, OSError) as exc: raise RuntimeError(str(exc)) handle = events.Handle(callback, args, self) self._signal_handlers[sig] = handle try: # Register a dummy signal handler to ask Python to write the signal # number in the wakup file descriptor. _process_self_data() will # read signal numbers from this file descriptor to handle signals. signal.signal(sig, _sighandler_noop) # Set SA_RESTART to limit EINTR occurrences. signal.siginterrupt(sig, False) except OSError as exc: del self._signal_handlers[sig] if not self._signal_handlers: try: signal.set_wakeup_fd(-1) except (ValueError, OSError) as nexc: logger.info('set_wakeup_fd(-1) failed: %s', nexc) if exc.errno == errno.EINVAL: raise RuntimeError('sig {} cannot be caught'.format(sig)) else: raise def _handle_signal(self, sig): """Internal helper that is the actual signal handler.""" handle = self._signal_handlers.get(sig) if handle is None: return # Assume it's some race condition. if handle._cancelled: self.remove_signal_handler(sig) # Remove it properly. else: self._add_callback_signalsafe(handle) def remove_signal_handler(self, sig): """Remove a handler for a signal. UNIX only. Return True if a signal handler was removed, False if not. """ self._check_signal(sig) try: del self._signal_handlers[sig] except KeyError: return False if sig == signal.SIGINT: handler = signal.default_int_handler else: handler = signal.SIG_DFL try: signal.signal(sig, handler) except OSError as exc: if exc.errno == errno.EINVAL: raise RuntimeError('sig {} cannot be caught'.format(sig)) else: raise if not self._signal_handlers: try: signal.set_wakeup_fd(-1) except (ValueError, OSError) as exc: logger.info('set_wakeup_fd(-1) failed: %s', exc) return True def _check_signal(self, sig): """Internal helper to validate a signal. Raise ValueError if the signal number is invalid or uncatchable. Raise RuntimeError if there is a problem setting up the handler. """ if not isinstance(sig, int): raise TypeError('sig must be an int, not {!r}'.format(sig)) if not (1 <= sig < signal.NSIG): raise ValueError( 'sig {} out of range(1, {})'.format(sig, signal.NSIG)) def _make_read_pipe_transport(self, pipe, protocol, waiter=None, extra=None): return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) def _make_write_pipe_transport(self, pipe, protocol, waiter=None, extra=None): return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) @coroutine def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): with events.get_child_watcher() as watcher: waiter = self.create_future() transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, waiter=waiter, extra=extra, **kwargs) watcher.add_child_handler(transp.get_pid(), self._child_watcher_callback, transp) try: yield from waiter except Exception as exc: # Workaround CPython bug #23353: using yield/yield-from in an # except block of a generator doesn't clear properly # sys.exc_info() err = exc else: err = None if err is not None: transp.close() yield from transp._wait() raise err return transp def _child_watcher_callback(self, pid, returncode, transp): self.call_soon_threadsafe(transp._process_exited, returncode) @coroutine def create_unix_connection(self, protocol_factory, path, *, ssl=None, sock=None, server_hostname=None): assert server_hostname is None or isinstance(server_hostname, str) if ssl: if server_hostname is None: raise ValueError( 'you have to pass server_hostname when using ssl') else: if server_hostname is not None: raise ValueError('server_hostname is only meaningful with ssl') if path is not None: if sock is not None: raise ValueError( 'path and sock can not be specified at the same time') sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) try: sock.setblocking(False) yield from self.sock_connect(sock, path) except: sock.close() raise else: if sock is None: raise ValueError('no path and sock were specified') sock.setblocking(False) transport, protocol = yield from self._create_connection_transport( sock, protocol_factory, ssl, server_hostname) return transport, protocol @coroutine def create_unix_server(self, protocol_factory, path=None, *, sock=None, backlog=100, ssl=None): if isinstance(ssl, bool): raise TypeError('ssl argument must be an SSLContext or None') if path is not None: if sock is not None: raise ValueError( 'path and sock can not be specified at the same time') sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: sock.bind(path) except OSError as exc: sock.close() if exc.errno == errno.EADDRINUSE: # Let's improve the error message by adding # with what exact address it occurs. msg = 'Address {!r} is already in use'.format(path) raise OSError(errno.EADDRINUSE, msg) from None else: raise except: sock.close() raise else: if sock is None: raise ValueError( 'path was not specified, and no sock specified') if sock.family != socket.AF_UNIX: raise ValueError( 'A UNIX Domain Socket was expected, got {!r}'.format(sock)) server = base_events.Server(self, [sock]) sock.listen(backlog) sock.setblocking(False) self._start_serving(protocol_factory, sock, ssl, server) return server
- 1、DefaultEventLoopPolicy().get_event_loop()
- BaseEventLoop().run_until_complete
def run_until_complete(self, future): """ 1、参数为协程,使用asyncio.wait()值也是协程 2、会将协程包装成期物 3、运行直到期物状态为 done """ # 检查 loop 是否 close self._check_closed() new_task = not isinstance(future, futures.Future) future = tasks.ensure_future(future, loop=self) if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False # 添加 done 回调 future.add_done_callback(_run_until_complete_cb) try: self.run_forever() except: if new_task and future.done() and not future.cancelled(): # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn't have access to the # local task. future.exception() raise future.remove_done_callback(_run_until_complete_cb) if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result()
- asyncio.ensure_future
def ensure_future(coro_or_future, *, loop=None): # 把一个协程或者awaitable对象包装成期物(Future) if isinstance(coro_or_future, futures.Future): if loop is not None and loop is not coro_or_future._loop: raise ValueError('loop argument must agree with Future') # 直接返回 return coro_or_future # 判断是否为生成器或协程包装器 elif coroutines.iscoroutine(coro_or_future): if loop is None: loop = events.get_event_loop() # 创建 task 对象 task = loop.create_task(coro_or_future) if task._source_traceback: del task._source_traceback[-1] return task # py 3.5下,判断对象是否为 awaitable 对象 elif compat.PY35 and inspect.isawaitable(coro_or_future): return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) else: raise TypeError('A Future, a coroutine or an awaitable is required')
- asyncio.wait
@coroutine def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): """ fs 参数是 futures 或者 coroutines 序列,且不能为空 return two sets of Future: (done, pending). 用法: done, pending = yield from asyncio.wait(fs) """ if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs): raise TypeError("expect a list of futures, not %s" % type(fs).__name__) if not fs: raise ValueError('Set of coroutines/Futures is empty.') if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): raise ValueError('Invalid return_when value: {}'.format(return_when)) if loop is None: loop = events.get_event_loop() # 把 fs 中的参数转化为期物 future 对象的集合 fs = {ensure_future(f, loop=loop) for f in set(fs)} return (yield from _wait(fs, timeout, return_when, loop))
- asyncio._wait
@coroutine def _wait(fs, timeout, return_when, loop): # fs 必须是集合 assert fs, 'Set of Futures is empty.' # 创建一个 future 对象,附加到当前 loop 上 waiter = loop.create_future() timeout_handle = None if timeout is not None: # 在给定的时间,进行超时回调 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) counter = len(fs) def _on_completion(f): # 声明使用外层变量 counter nonlocal counter counter -= 1 """ 以下三种状态均表示 waiter 期物运行结束 1、所有的期物都运行完成 2、return_when 值为 FIRST_COMPLETED 3、return_when 值为 FIRST_EXCEPTION 并且期物没有被取消,且 f.exception()值存在 """ if (counter <= 0 or return_when == FIRST_COMPLETED or return_when == FIRST_EXCEPTION and (not f.cancelled() and f.exception() is not None)): # 如果超时回调函数存在,则取消 if timeout_handle is not None: timeout_handle.cancel() # 变更 waiter 期物的状态为结束 if not waiter.done(): waiter.set_result(None) for f in fs: # 给期物添加运行完成后的回调 f.add_done_callback(_on_completion) try: yield from waiter finally: if timeout_handle is not None: timeout_handle.cancel() done, pending = set(), set() for f in fs: # 将 _on_completion 从 done 回调列表移除 f.remove_done_callback(_on_completion) if f.done(): done.add(f) else: pending.add(f) return done, pending