Python协程python开发Python

Python 期物之 asyncio.Future

2018-03-03  本文已影响17人  宝宝家的隔壁老王
class Future:
    """
    这个类同 concurrent.futures.Future基本上是兼容的
    
    Difference:
    
    - result() 和 exception() 不提供 timeout 参数,并且如果期物没有执行完毕会 raise 异常
    
    - 使用 add_done_callback() 注册的回调只能通过循环事件的 call_soon_threadsafe() 执行
    
    - 同 wait() 和 as_completed() 方法不兼容
    
    (Python 3.4 以及后面可能会统一这一块的处理)
    """

    # Class variables serving as defaults for instance variables.
    _state = _PENDING
    _result = None
    _exception = None
    _loop = None
    _source_traceback = None

    _blocking = False  # proper use of future (yield vs yield from)

    _log_traceback = False   # Used for Python 3.4 and later
    _tb_logger = None        # Used for Python 3.3 only

    def __init__(self, *, loop=None):
        """
        使用 loop 初始化期物实例
        """
        if loop is None:
            self._loop = events.get_event_loop()
        else:
            self._loop = loop
        self._callbacks = []
        if self._loop.get_debug():
            self._source_traceback = traceback.extract_stack(sys._getframe(1))

    # 格式化展示回调函数
    def __format_callbacks(self):
        cb = self._callbacks
        size = len(cb)
        if not size:
            cb = ''

        def format_cb(callback):
            return events._format_callback_source(callback, ())

        if size == 1:
            cb = format_cb(cb[0])
        elif size == 2:
            cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
        elif size > 2:
            cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
                                            size-2,
                                            format_cb(cb[-1]))
        return 'cb=[%s]' % cb

    # 格式化展示信息
    def _repr_info(self):
        info = [self._state.lower()]
        if self._state == _FINISHED:
            if self._exception is not None:
                info.append('exception={!r}'.format(self._exception))
            else:
                # use reprlib to limit the length of the output, especially
                # for very long strings
                result = reprlib.repr(self._result)
                info.append('result={}'.format(result))
        if self._callbacks:
            info.append(self.__format_callbacks())
        if self._source_traceback:
            frame = self._source_traceback[-1]
            info.append('created at %s:%s' % (frame[0], frame[1]))
        return info

    def __repr__(self):
        info = self._repr_info()
        return '<%s %s>' % (self.__class__.__name__, ' '.join(info))

    # On Python 3.3 and older, objects with a destructor part of a reference
    # cycle are never destroyed. It's not more the case on Python 3.4 thanks
    # to the PEP 442.
    if compat.PY34:
        def __del__(self):
            if not self._log_traceback:
                # set_exception() was not called, or result() or exception()
                # has consumed the exception
                return
            exc = self._exception
            context = {
                'message': ('%s exception was never retrieved'
                            % self.__class__.__name__),
                'exception': exc,
                'future': self,
            }
            if self._source_traceback:
                context['source_traceback'] = self._source_traceback
            self._loop.call_exception_handler(context)

    def cancel(self):
        """
        取消期物,并且执行回调。如果期物已经 done 或者 cancelled,返回 False。
        变更期物的状态,通知事件循环去安排回调,并且返回 True
        """
        if self._state != _PENDING:
            return False
        self._state = _CANCELLED
        self._schedule_callbacks()
        return True

    def _schedule_callbacks(self):
        """
        通知事件循环尽快的执行回调,清空 callback 列表
        """
        callbacks = self._callbacks[:]
        if not callbacks:
            return

        self._callbacks[:] = []
        for callback in callbacks:
            self._loop.call_soon(callback, self)

    def cancelled(self):
        return self._state == _CANCELLED

    # Don't implement running(); see http://bugs.python.org/issue18699

    def done(self):
        """
        期物已经运行结束或取消都是 done 的状态
        """
        
        return self._state != _PENDING

    def result(self):
        """
        不会阻塞等待期物的执行,如果期物的状态非 _FINISHED,则 raise 对应的错误。如果 _exception 值存在,则 raise 对应的值,否则返回 _result值
        """
        if self._state == _CANCELLED:
            raise CancelledError
        if self._state != _FINISHED:
            raise InvalidStateError('Result is not ready.')
        self._log_traceback = False
        if self._tb_logger is not None:
            self._tb_logger.clear()
            self._tb_logger = None
        if self._exception is not None:
            raise self._exception
        return self._result

    def exception(self):
        """
        不会阻塞等待期物的执行,如果期物的状态非 _FINISHED,则 raise 对应的错误。返回 _exception 值
        """
        if self._state == _CANCELLED:
            raise CancelledError
        if self._state != _FINISHED:
            raise InvalidStateError('Exception is not set.')
        self._log_traceback = False
        if self._tb_logger is not None:
            self._tb_logger.clear()
            self._tb_logger = None
        return self._exception

    def add_done_callback(self, fn):
        """
        给期物运行完毕添加方法回调
        如果期物已经运行完毕,则立即回调
        """
        if self._state != _PENDING:
            self._loop.call_soon(fn, self)
        else:
            self._callbacks.append(fn)

    # New method not in PEP 3148.

    def remove_done_callback(self, fn):
        """
        移除指定回调
        """
        filtered_callbacks = [f for f in self._callbacks if f != fn]
        removed_count = len(self._callbacks) - len(filtered_callbacks)
        if removed_count:
            self._callbacks[:] = filtered_callbacks
        return removed_count

    # So-called internal methods (note: no set_running_or_notify_cancel()).

    def set_result(self, result):
        """
        标记期物运行结束,设置 _result 值,如果在期物运行结束时调用这个方法,则 raise InvalidStateError
        """
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result
        self._state = _FINISHED
        self._schedule_callbacks()  # 运行回调

    def set_exception(self, exception):
        """
        标记期物运行结束,设置 _exception 值,如果在期物运行结束时调用这个方法,则 raise InvalidStateError
        """idStateError.
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        if isinstance(exception, type):
            exception = exception()
        if type(exception) is StopIteration:
            raise TypeError("StopIteration interacts badly with generators "
                            "and cannot be raised into a Future")
        self._exception = exception
        self._state = _FINISHED
        self._schedule_callbacks()
        if compat.PY34:
            self._log_traceback = True
        else:
            self._tb_logger = _TracebackLogger(self, exception)
            # Arrange for the logger to be activated after all callbacks
            # have had a chance to call result() or exception().
            self._loop.call_soon(self._tb_logger.activate)

    def __iter__(self):
        if not self.done():
            self._blocking = True
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.

    if compat.PY35:
        # 兼容 async/await 方法
        __await__ = __iter__ # make compatible with 'await' expression
上一篇下一篇

猜你喜欢

热点阅读