Python

Python:协程中 Task 和 Future 的理解及使用

2020-11-03  本文已影响0人  dex0423

1. Task 概念及用法

注意:Python 3.7 以后的版本支持 asyncio.create_task(),在此之前的写法为 loop.create_task(),开发过程中需要注意代码写法对不同版本 python 的兼容性。

当多个 Task 被加入一个 task_list 的时候,添加 Task 的过程中 Task 不会执行,必须要用 await asyncio.wait()await asyncio.gather() 将 Task 对象加入事件循环中异步执行。

注意:创建 Task 对象时,除了可以使用 asyncio.create_task() 之外,还可以用最低层级的 loop.create_task()asyncio.ensure_future(),他们都可以用来创建 Task 对象,其中关于 ensure_future 相关内容本文接下来会一起讲。

import asyncio
import arrow


def current_time():
    '''
    获取当前时间
    :return:
    '''
    cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
    return cur_time


async def func(sleep_time):
    func_name_suffix = sleep_time        # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
    print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
    await asyncio.sleep(sleep_time)
    print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
    return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"


async def run():
    task_list = []
    for i in range(5):
        task = asyncio.create_task(async_func(i))
        task_list.append(task)

    done, pending = await asyncio.wait(task_list, timeout=None)
    for done_task in done:
        print((f"[{current_time()}] 得到执行结果 {done_task.result()}"))

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())


if __name__ == '__main__':
    main()

代码执行结果如下:

[2020-11-03 22:45:53] 执行异步函数 func-0
[2020-11-03 22:45:53] 执行异步函数 func-1
[2020-11-03 22:45:53] 执行异步函数 func-2
[2020-11-03 22:45:53] 执行异步函数 func-3
[2020-11-03 22:45:53] 执行异步函数 func-4
[2020-11-03 22:45:53] 函数 func-0 执行完毕
[2020-11-03 22:45:54] 函数 func-1 执行完毕
[2020-11-03 22:45:55] 函数 func-2 执行完毕
[2020-11-03 22:45:56] 函数 func-3 执行完毕
[2020-11-03 22:45:57] 函数 func-4 执行完毕
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:57] 得到函数 func-4 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:55] 得到函数 func-2 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:53] 得到函数 func-0 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:56] 得到函数 func-3 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:54] 得到函数 func-1 执行结果】

2. Future 概念解读

正因为 ensure_future 函数最后返回的一定是一个 awaitable 对象,所以才保证了继承自 Future 的 Task 是 awaitable 的。
同时,协程对象一位内无法自己执行,需要将其注册到事件循环中转变为一个 Task 对象才会被执行,所以协程对象一定 awaitable 的。

def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.

    If the argument is a Future, it is returned directly.
    """
    if coroutines.iscoroutine(coro_or_future):
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
    elif futures.isfuture(coro_or_future):
        if loop is not None and loop is not futures._get_loop(coro_or_future):
            raise ValueError('loop argument must agree with Future')
        return coro_or_future
    elif inspect.isawaitable(coro_or_future):
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    else:
        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
                        'required')

@coroutine
def _wrap_awaitable(awaitable):
    """Helper for asyncio.ensure_future().

    Wraps awaitable (an object with __await__) into a coroutine
    that will later be wrapped in a Task by ensure_future().
    """
    return (yield from awaitable.__await__())
class Future:
    """This class is *almost* compatible with concurrent.futures.Future.

    Differences:

    - This class is not thread-safe.

    - result() and exception() do not take a timeout argument and
      raise an exception when the future isn't done yet.

    - Callbacks registered with add_done_callback() are always called
      via the event loop's call_soon().

    - This class is not compatible with the wait() and as_completed()
      methods in the concurrent.futures package.

    (In Python 3.4 or later we may be able to unify the implementations.)
    """

    # Class variables serving as defaults for instance variables.
    _state = _PENDING
    _result = None
    _exception = None
    _loop = None
    _source_traceback = None

    # This field is used for a dual purpose:
    # - Its presence is a marker to declare that a class implements
    #   the Future protocol (i.e. is intended to be duck-type compatible).
    #   The value must also be not-None, to enable a subclass to declare
    #   that it is not compatible by setting this to None.
    # - It is set by __iter__() below so that Task._step() can tell
    #   the difference between
    #   `await Future()` or`yield from Future()` (correct) vs.
    #   `yield Future()` (incorrect).
    _asyncio_future_blocking = False

    __log_traceback = False

    def __init__(self, *, loop=None):
        """Initialize the future.

        The optional event_loop argument allows explicitly setting the event
        loop object used by the future. If it's not provided, the future uses
        the default event loop.
        """
        if loop is None:
            self._loop = events.get_event_loop()
        else:
            self._loop = loop
        self._callbacks = []
        if self._loop.get_debug():
            self._source_traceback = format_helpers.extract_stack(
                sys._getframe(1))

    _repr_info = base_futures._future_repr_info

    def __repr__(self):
        return '<{} {}>'.format(self.__class__.__name__,
                                ' '.join(self._repr_info()))

    def __del__(self):
        if not self.__log_traceback:
            # set_exception() was not called, or result() or exception()
            # has consumed the exception
            return
        exc = self._exception
        context = {
            'message':
                f'{self.__class__.__name__} exception was never retrieved',
            'exception': exc,
            'future': self,
        }
        if self._source_traceback:
            context['source_traceback'] = self._source_traceback
        self._loop.call_exception_handler(context)

    @property
    def _log_traceback(self):
        return self.__log_traceback

    @_log_traceback.setter
    def _log_traceback(self, val):
        if bool(val):
            raise ValueError('_log_traceback can only be set to False')
        self.__log_traceback = False

    def get_loop(self):
        """Return the event loop the Future is bound to."""
        return self._loop

    def cancel(self):
        """Cancel the future and schedule callbacks.

        If the future is already done or cancelled, return False.  Otherwise,
        change the future's state to cancelled, schedule the callbacks and
        return True.
        """
        self.__log_traceback = False
        if self._state != _PENDING:
            return False
        self._state = _CANCELLED
        self.__schedule_callbacks()
        return True

    def __schedule_callbacks(self):
        """Internal: Ask the event loop to call all callbacks.

        The callbacks are scheduled to be called as soon as possible. Also
        clears the callback list.
        """
        callbacks = self._callbacks[:]
        if not callbacks:
            return

        self._callbacks[:] = []
        for callback, ctx in callbacks:
            self._loop.call_soon(callback, self, context=ctx)

    def cancelled(self):
        """Return True if the future was cancelled."""
        return self._state == _CANCELLED

    # Don't implement running(); see http://bugs.python.org/issue18699

    def done(self):
        """Return True if the future is done.

        Done means either that a result / exception are available, or that the
        future was cancelled.
        """
        return self._state != _PENDING

    def result(self):
        """Return the result this future represents.

        If the future has been cancelled, raises CancelledError.  If the
        future's result isn't yet available, raises InvalidStateError.  If
        the future is done and has an exception set, this exception is raised.
        """
        if self._state == _CANCELLED:
            raise CancelledError
        if self._state != _FINISHED:
            raise InvalidStateError('Result is not ready.')
        self.__log_traceback = False
        if self._exception is not None:
            raise self._exception
        return self._result

    def exception(self):
        """Return the exception that was set on this future.

        The exception (or None if no exception was set) is returned only if
        the future is done.  If the future has been cancelled, raises
        CancelledError.  If the future isn't done yet, raises
        InvalidStateError.
        """
        if self._state == _CANCELLED:
            raise CancelledError
        if self._state != _FINISHED:
            raise InvalidStateError('Exception is not set.')
        self.__log_traceback = False
        return self._exception

    def add_done_callback(self, fn, *, context=None):
        """Add a callback to be run when the future becomes done.

        The callback is called with a single argument - the future object. If
        the future is already done when this is called, the callback is
        scheduled with call_soon.
        """
        if self._state != _PENDING:
            self._loop.call_soon(fn, self, context=context)
        else:
            if context is None:
                context = contextvars.copy_context()
            self._callbacks.append((fn, context))

    # New method not in PEP 3148.

    def remove_done_callback(self, fn):
        """Remove all instances of a callback from the "call when done" list.

        Returns the number of callbacks removed.
        """
        filtered_callbacks = [(f, ctx)
                              for (f, ctx) in self._callbacks
                              if f != fn]
        removed_count = len(self._callbacks) - len(filtered_callbacks)
        if removed_count:
            self._callbacks[:] = filtered_callbacks
        return removed_count

    # So-called internal methods (note: no set_running_or_notify_cancel()).

    def set_result(self, result):
        """Mark the future done and set its result.

        If the future is already done when this method is called, raises
        InvalidStateError.
        """
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result
        self._state = _FINISHED
        self.__schedule_callbacks()

    def set_exception(self, exception):
        """Mark the future done and set an exception.

        If the future is already done when this method is called, raises
        InvalidStateError.
        """
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        if isinstance(exception, type):
            exception = exception()
        if type(exception) is StopIteration:
            raise TypeError("StopIteration interacts badly with generators "
                            "and cannot be raised into a Future")
        self._exception = exception
        self._state = _FINISHED
        self.__schedule_callbacks()
        self.__log_traceback = True

    def __await__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        if not self.done():
            raise RuntimeError("await wasn't used with future")
        return self.result()  # May raise too.

    __iter__ = __await__  # make compatible with 'yield from'.
上一篇 下一篇

猜你喜欢

热点阅读