Starlette 解读 by Gascognya

Starlette 源码阅读 (一) ASGI应用

2020-08-06  本文已影响0人  Gascognya

为了深入了解FastAPI框架,笔者决定将其基础库Starlette的源码进行阅读,未来也可能阅读uvicorn库的ASGI原理。个人水平十分有限,有错误的地方还请见谅。


Starlette文件结构

从applications.py开始

applications只包含了Starlette一个类

 class Starlette:
  """
        创建一个APP
        参数:
            (1) debug: 布尔值, 用于设置在出现错误时是否应返回调试回溯.

            (2) routers: 一个路由列表, 提供HTTP和WebSocket服务.

            (3) middleware: 一个中间件列表, 应用于每个request请求.
                一个starlette应用, 最少包含两个中间件:
                1. ServerErrorMiddleware中间件处于最外层, 以处理在整个堆栈中任何地方发生的未捕获错误.
                2. ExceptionMiddleware在最内层以处理处理路由或终结点中发生的异常情况.

            (4) exception_handlers: 一个字典, 键为状态码或者异常类, 值为其对应的调用函数.
                调用函数应该是handler(request, exc) -> response, 可以是标准函数, 也可以是异步函数.

            (5) on_startup: 一个启动时的调用项列表, 启动处理调用项, 不接受任何参数, 可以是同步函数, 也可以是异步函数.

            (6) on_shutdown: 一个关闭时的调用项列表, 同上.
    """
    def __init__(
        self,
        debug: bool = False,
        routes: typing.Sequence[BaseRoute] = None,
        middleware: typing.Sequence[Middleware] = None,
        exception_handlers: typing.Dict[
            typing.Union[int, typing.Type[Exception]], typing.Callable
        ] = None,
        on_startup: typing.Sequence[typing.Callable] = None,
        on_shutdown: typing.Sequence[typing.Callable] = None,
        lifespan: typing.Callable[["Starlette"], typing.AsyncGenerator] = None,
    ) -> None:
        """
            lifespan上下文函数是一种较新的样式,
            它取代了on_startup/on_shutdown处理程序。
            两种方式任选其一,切勿一起使用。
        """
        assert lifespan is None or (
            on_startup is None and on_shutdown is None
        ), "Use either 'lifespan' or 'on_startup'/'on_shutdown', not both."

        self._debug = debug
        self.state = State()
        self.router = Router(
            routes, on_startup=on_startup, on_shutdown=on_shutdown, lifespan=lifespan
        )
        self.exception_handlers = (
            {} if exception_handlers is None else dict(exception_handlers)
        )
        self.user_middleware = [] if middleware is None else list(middleware)
        self.middleware_stack = self.build_middleware_stack()
        # build_middleware_stack函数用于构建中间件堆栈
    def build_middleware_stack(self) -> ASGIApp:
        """
            构建中间件堆栈
            返回值是ServerErrorMiddleware, 也就是最外层的中间件.其app属性指向上一个中间件, 以此类推
        """
        debug = self.debug
        error_handler = None
        exception_handlers = {}

        for key, value in self.exception_handlers.items():
            if key in (500, Exception):
                error_handler = value
            else:
                exception_handlers[key] = value
        # 将异常类和状态码分离出来保存

        middleware = (
            [Middleware(ServerErrorMiddleware, handler=error_handler, debug=debug,)]
            + self.user_middleware
            + [
                Middleware(
                    ExceptionMiddleware, handlers=exception_handlers, debug=debug,
                )
            ]
        )
        # 创建中间件元祖, 顺序为:
        # (1)最外层ServerErrorMiddleware(异常类)
        # (2)用户的中间件列表
        # (3)最内层ExceptionMiddleware(状态码字典)

        app = self.router
        for cls, options in reversed(middleware):
            app = cls(app=app, **options)
        # cls, options为middleware类的两个属性
        # 分别对应上面Middleware的第一个参数, 和后续参数.
        # 随后进行套娃, 迭代中第一个app中的app属性为self.router, 随后每个中间件app中的app属性, 皆指向上一个中间件app
        # 把最后一个套娃(应该是最外层的ServerErrorMiddleware)返回

        return app
    @property
    def routes(self) -> typing.List[BaseRoute]:
        return self.router.routes

    @property
    def debug(self) -> bool:
        return self._debug

    @debug.setter
    def debug(self, value: bool) -> None:
        self._debug = value
        self.middleware_stack = self.build_middleware_stack()

    def url_path_for(self, name: str, **path_params: str) -> URLPath:
        return self.router.url_path_for(name, **path_params)

App的启动入口

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        scope["app"] = self
        # 将自身装入到传过来的scope字典中, 然后传给中间件堆栈
        await self.middleware_stack(scope, receive, send)
        # 前面提到, middleware_stack实际上是最外层中间件ServerErrorMiddleware
        # 调用ServerErrorMiddleware的__call__
errors.py → ServerErrorMiddleware类

与上文相关的部分

class ServerErrorMiddleware:
    """
        当服务器发生错误时, 返回500响应

        如果debug设置了, 那么将返回traceback, 否则将调用指定的handler

        这个中间件类通常应该用于包含其他所有中间件, 这样保证任何地方出现异常
        都可以返回 500 response
    """

    def __init__(
        self, app: ASGIApp, handler: typing.Callable = None, debug: bool = False
    ) -> None:
        self.app = app
        self.handler = handler
        self.debug = debug

        # if key in (500, Exception):
        #     error_handler = value
        # [Middleware(ServerErrorMiddleware, handler=error_handler, debug=debug,)]
        # 中间件堆栈定义时的参数, handler为用户自定义的500处理:

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return
        # 如果请求类型不是http, 则转入下一个中间件

        response_started = False
        async def _send(message: Message) -> None:
            nonlocal response_started, send

            if message["type"] == "http.response.start":
                response_started = True
            await send(message)
        # 做一个闭包传给下一个中间件, 让后续中间件使用send时,
        # 能够修改自身的response_started为True

        try:
            await self.app(scope, receive, _send)
        except Exception as exc:
            # 如果调用过_send, response_started为True
            # 下面代码将不会执行
            if not response_started:
                request = Request(scope)
                if self.debug:
                    # 在debug模式中, 将返回traceback
                    response = self.debug_response(request, exc)
                elif self.handler is None:
                    # 使用默认的handler处理
                    response = self.error_response(request, exc)
                else:
                    # 使用用户自定义的500 handler
                    if asyncio.iscoroutinefunction(self.handler):
                        response = await self.handler(request, exc)
                    else:
                        response = await run_in_threadpool(self.handler, request, exc)
                    # 判断是否为协程, 不是则以线程池方式运行
                await response(scope, receive, send)

            # 我们总是不断地抛出异常。
            # 这允许服务器记录错误,
            # 或者允许测试客户端在测试用例中选择性地提出错误。
            raise exc from None

调用

uvicorn.run(app, host="127.0.0.1", port=8000)
运行时将app传给ASGI服务器, 服务器调用时会沿着
app.__call__ → middleware1.__call__ → middleware2.__call__...
查看最内层中间件ExceptionMiddleware.__call__的部分代码

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        response_started = False

        async def sender(message: Message) -> None:
            nonlocal response_started

            if message["type"] == "http.response.start":
                response_started = True
            await send(message)

        try:
            await self.app(scope, receive, sender)
        except Exception as exc:

会发现和最外层的ServerErrorMiddleware有极为相似, 暂且推测. 整个中间件堆栈中, 多次使用了将send闭包传给下层, 下层可能再做一次闭包将自己的response_started也包括进去.

其结构抽象化如下:
    async def send3(message):
        nonlocal response_started
        if message["type"] == "http.response.start":
            response_started = True
            # ExceptionMiddleware.response_started
        await def send2(message):
                nonlocal response_started
                if message["type"] == "http.response.start":
                    response_started = True
                    # XXXMiddleware.response_started
                await def send1(message):
                        nonlocal response_started
                        if message["type"] == "http.response.start":
                            response_started = True
                            # ServerErrorMiddleware.response_started
                        await send(message)

回到applications.py → Starlette类

英文注释在这里做了警告, 以下都是修改Starlette自身属性的方法

    # The following usages are now discouraged in favour of configuration
    #  during Starlette.__init__(...)
    # 以下方法, 现在不支持在__init__中配置使用
    def on_event(self, event_type: str) -> typing.Callable:
        return self.router.on_event(event_type)

    def mount(self, path: str, app: ASGIApp, name: str = None) -> None:
        self.router.mount(path, app=app, name=name)

    def host(self, host: str, app: ASGIApp, name: str = None) -> None:
        self.router.host(host, app=app, name=name)

    def add_middleware(self, middleware_class: type, **options: typing.Any) -> None:
        self.user_middleware.insert(0, Middleware(middleware_class, **options))
        self.middleware_stack = self.build_middleware_stack()

    def add_exception_handler(
        self,
        exc_class_or_status_code: typing.Union[int, typing.Type[Exception]],
        handler: typing.Callable,
    ) -> None:
        self.exception_handlers[exc_class_or_status_code] = handler
        self.middleware_stack = self.build_middleware_stack()

    def add_event_handler(self, event_type: str, func: typing.Callable) -> None:
        self.router.add_event_handler(event_type, func)

    def add_route(
        self,
        path: str,
        route: typing.Callable,
        methods: typing.List[str] = None,
        name: str = None,
        include_in_schema: bool = True,
    ) -> None:
        self.router.add_route(
            path, route, methods=methods, name=name, include_in_schema=include_in_schema
        )

    def add_websocket_route(
        self, path: str, route: typing.Callable, name: str = None
    ) -> None:
        self.router.add_websocket_route(path, route, name=name)

以及对上述方法的一些包装

    def exception_handler(
        self, exc_class_or_status_code: typing.Union[int, typing.Type[Exception]]
    ) -> typing.Callable:
        def decorator(func: typing.Callable) -> typing.Callable:
            self.add_exception_handler(exc_class_or_status_code, func)
            return func

        return decorator

    def route(
        self,
        path: str,
        methods: typing.List[str] = None,
        name: str = None,
        include_in_schema: bool = True,
    ) -> typing.Callable:
        def decorator(func: typing.Callable) -> typing.Callable:
            self.router.add_route(
                path,
                func,
                methods=methods,
                name=name,
                include_in_schema=include_in_schema,
            )
            return func

        return decorator

    def websocket_route(self, path: str, name: str = None) -> typing.Callable:
        def decorator(func: typing.Callable) -> typing.Callable:
            self.router.add_websocket_route(path, func, name=name)
            return func

        return decorator

    def middleware(self, middleware_type: str) -> typing.Callable:
        assert (
            middleware_type == "http"
        ), 'Currently only middleware("http") is supported.'

        def decorator(func: typing.Callable) -> typing.Callable:
            self.add_middleware(BaseHTTPMiddleware, dispatch=func)
            return func

        return decorator

以上为Starlette库中applications.py为主的源码解读, 如有错误欢迎指正.

上一篇下一篇

猜你喜欢

热点阅读