Starlette 解读 by Gascognya

Starlette 源码阅读 (十) 异步与后台任务

2020-08-17  本文已影响0人  Gascognya

background.py & concurrency.py

实际上笔者对于异步的深层原理了解并不透彻,还只停留在勉强会用的水平。日后会对这方面只是进行系统性自下而上的学习。

后台任务

代码十分简单,在此仅贴出来

class BackgroundTask:
    def __init__(
        self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
    ) -> None:
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.is_async = asyncio.iscoroutinefunction(func)

    async def __call__(self) -> None:
        if self.is_async:
            await self.func(*self.args, **self.kwargs)
        else:
            await run_in_threadpool(self.func, *self.args, **self.kwargs)


class BackgroundTasks(BackgroundTask):
    def __init__(self, tasks: typing.Sequence[BackgroundTask] = []):
        self.tasks = list(tasks)

    def add_task(
        self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
    ) -> None:
        task = BackgroundTask(func, *args, **kwargs)
        self.tasks.append(task)

    async def __call__(self) -> None:
        for task in self.tasks:
            await task()

异步

提供了一种异步循环方式,还有一个线程池

T = typing.TypeVar("T")
async def run_until_first_complete(*args: typing.Tuple[typing.Callable, dict]) -> None:
    # 直到其中一个完成
    # 在StreamingResponse中使用过
    # await run_until_first_complete(
    #    (self.stream_response, {"send": send}),
    #    (self.listen_for_disconnect, {"receive": receive}),
    # )
    tasks = [handler(**kwargs) for handler, kwargs in args]
    (done, pending) = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    [task.cancel() for task in pending]
    [task.result() for task in done]


async def run_in_threadpool(
    func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any
) -> T:
    # 线程池
    loop = asyncio.get_event_loop()
    if contextvars is not None:  # pragma: no cover
        # 确保我们运行在相同的上下文中
        child = functools.partial(func, *args, **kwargs)
        context = contextvars.copy_context()
        func = context.run
        args = (child,)
    elif kwargs:  # pragma: no cover
        # run_in_executor不接受“kwargs”,因此将它们绑定到这里
        func = functools.partial(func, **kwargs)
    return await loop.run_in_executor(None, func, *args)


class _StopIteration(Exception):
    pass


def _next(iterator: Iterator) -> Any:
    # 我们不能从线程池迭代器内部触发‘StopIteration’,
    # 然后在上下文外部捕获它,所以我们强制它们进入不同的异常类型。
    try:
        return next(iterator)
    except StopIteration:
        raise _StopIteration


async def iterate_in_threadpool(iterator: Iterator) -> AsyncGenerator:
    while True:
        try:
            yield await run_in_threadpool(_next, iterator)
        except _StopIteration:
            break

上一篇 下一篇

猜你喜欢

热点阅读