程序员Starlette 解读 by Gascognya

Starlette 源码阅读 (二) 路由

2020-08-09  本文已影响0人  Gascognya
本篇开始阅读Starlette的routing.py源码
routing.py结构

从Router类开始

class Router:
    """
        参数:
            (1) routes: 路由列表
            (2) redirect_slashes: 重定向斜杠
            (3) default: 处理无法匹配项的最基础App
            (4) on_start: 启动事件列表
            (5) on_shutdown: 结束事件列表
            (6) lifespan: 上述两者的合并
    """
    def __init__(
        self,
        routes: typing.Sequence[BaseRoute] = None,
        redirect_slashes: bool = True,
        default: ASGIApp = None,
        on_startup: typing.Sequence[typing.Callable] = None,
        on_shutdown: typing.Sequence[typing.Callable] = None,
        lifespan: typing.Callable[[typing.Any], typing.AsyncGenerator] = None,
    ) -> None:
        self.routes = [] if routes is None else list(routes)
        self.redirect_slashes = redirect_slashes
        self.default = self.not_found if default is None else default
        self.on_startup = [] if on_startup is None else list(on_startup)
        self.on_shutdown = [] if on_shutdown is None else list(on_shutdown)

        async def default_lifespan(app: typing.Any) -> typing.AsyncGenerator:
            await self.startup()
            yield
            await self.shutdown()

        # 将on_start和on_shutdown合并成一个生成器, 赋值给lifespan, 如果lifespan不存在
        # 运行所有启动事件 → yield → 运行所有结束事件
        self.lifespan_context = default_lifespan if lifespan is None else lifespan

    async def not_found(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] == "websocket":
            websocket_close = WebSocketClose()
            await websocket_close(scope, receive, send)
            return

        # 如果我们在starlette应用中运行, 且抛出了一个错误
        # 那么可配置的异常处理程序, 便可处理返回的response并触发一场
        # 对于简单的ASGI程序来说, 只仅仅返回response即可
        if "app" in scope:
            raise HTTPException(status_code=404)
        else:
            response = PlainTextResponse("Not Found", status_code=404)
        await response(scope, receive, send)

    def url_path_for(self, name: str, **path_params: str) -> URLPath:
        for route in self.routes:
            try:
                return route.url_path_for(name, **path_params)
            except NoMatchFound:
                pass
        raise NoMatchFound()
启动时间与结束事件

startup()与shutdown()为上文self.lifespan_context = default_lifespan
def default_lifespan的定义

    async def startup(self) -> None:
        """
        运行所有 启动事件
        """
        for handler in self.on_startup:
            if asyncio.iscoroutinefunction(handler):
                await handler()
            else:
                handler()

    async def shutdown(self) -> None:
        """
        运行所有 结束事件
        """
        for handler in self.on_shutdown:
            if asyncio.iscoroutinefunction(handler):
                await handler()
            else:
                handler()

    async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        处理 ASGI 生命周期(lifespan)消息, 它允许我们管理启动事件和结束事件
        """
        first = True
        app = scope.get("app")
        message = await receive()
        try:
            if inspect.isasyncgenfunction(self.lifespan_context):  # 判断是否为协程
                async for item in self.lifespan_context(app):
                    # 第一次触发 await self.startup()
                    # 第二次触发 await self.shutdown()
                    assert first, "Lifespan context yielded multiple times."
                    # 第一次触发前为True, 发送启动事件完成信息
                    # 第二次触发前为false, 不再发送该消息, 异常被捕获
                    first = False
                    await send({"type": "lifespan.startup.complete"})
                    message = await receive()
                    # 发送生命周期启动事件完成信息, 等待回应
            else:
                for item in self.lifespan_context(app):  # type: ignore
                    assert first, "Lifespan context yielded multiple times."
                    first = False
                    await send({"type": "lifespan.startup.complete"})
                    message = await receive()
        except BaseException:
            # 捕获错误
            if first:
                # 如果first为true, 说明为第一次完成前发生的意外错误
                exc_text = traceback.format_exc()
                await send({"type": "lifespan.startup.failed", "message": exc_text})
            raise
        else:
            # 否则为人为错误, 发送结束时间完成信息
            await send({"type": "lifespan.shutdown.complete"})
路由主入口
    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        Router类的主入口
        """
        assert scope["type"] in ("http", "websocket", "lifespan")

        if "router" not in scope:
            scope["router"] = self

        if scope["type"] == "lifespan":
            await self.lifespan(scope, receive, send)
            return

        partial = None

        for route in self.routes:
            # 确认是否有路由和传入的范围匹配
            # 如果发现, 则移交给匹配的路由
            match, child_scope = route.matches(scope)
            if match == Match.FULL:
                scope.update(child_scope)
                await route.handle(scope, receive, send)
                # 如果完全匹配, 则直接交给路由节点的函数处理
                return
            elif match == Match.PARTIAL and partial is None:
                partial = route
                partial_scope = child_scope

        if partial is not None:
            # 处理部分匹配, 在这种情况下, endpoint能够处理请求, 但不是首选.
            # 我们特意使用这个partial来处理 "405 Method Not Allowed"
            scope.update(partial_scope)
            await partial.handle(scope, receive, send)
            return

        # 未匹配的情况, 判断重定向
        if scope["type"] == "http" and self.redirect_slashes and scope["path"] != "/":
            # 如果类型为http 且 自身存在重定向符 且 路径不为根路径
            redirect_scope = dict(scope)
            if scope["path"].endswith("/"):
                redirect_scope["path"] = redirect_scope["path"].rstrip("/")
            else:
                redirect_scope["path"] = redirect_scope["path"] + "/"
            # 路径如果包含/则去掉, 不包含则加上
            for route in self.routes:
                match, child_scope = route.matches(redirect_scope)
                if match != Match.NONE:
                    # 再次进行匹配, 如果结果不为空, 则发送重定向response
                    redirect_url = URL(scope=redirect_scope)
                    response = RedirectResponse(url=str(redirect_url))
                    await response(scope, receive, send)
                    return
        # 完全未匹配情况, 调用自身的default
        # 在定义时, self.default = self.not_found, 来处理404信息
        await self.default(scope, receive, send)

routing.py→Match

class Match(Enum):
    """
    一个匹配程度的枚举类
    分别用0,1,2来表示
    """
    NONE = 0
    PARTIAL = 1
    FULL = 2

# The following usages are now discouraged in favour of configuration during Router.__init__(...)
以此往下同Starlette类, 都是操作自身属性的封装方法

    def mount(self, path: str, app: ASGIApp, name: str = None) -> None:
        route = Mount(path, app=app, name=name)
        self.routes.append(route)

    def host(self, host: str, app: ASGIApp, name: str = None) -> None:
        route = Host(host, app=app, name=name)
        self.routes.append(route)

    def add_route(): ...
    def add_websocket_route(): ...
    def route(): ...
    def websocket_route(): ...
    def add_event_handler(): ...
    def on_event(): ...

前两个方法说明, 除了Route对象, 可以向路由中加载Mount对象以及Host对象, 他们都继承于BaseRoute类

BaseRoute类

class BaseRoute:
    def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
        raise NotImplementedError()  # pragma: no cover

    def url_path_for(self, name: str, **path_params: str) -> URLPath:
        raise NotImplementedError()  # pragma: no cover

    async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
        raise NotImplementedError()  # pragma: no cover

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        一个 Route 可以作为一个独立的ASGI应用程序单独使用
        这是一种人为的情况,因为它们几乎总是在一个 Router 中使用
        但可能对一些工具和最小的应用程序有用
        """
        match, child_scope = self.matches(scope)
        if match == Match.NONE:
            if scope["type"] == "http":
                response = PlainTextResponse("Not Found", status_code=404)
                await response(scope, receive, send)
            elif scope["type"] == "websocket":
                websocket_close = WebSocketClose()
                await websocket_close(scope, receive, send)
            return

        scope.update(child_scope)
        await self.handle(scope, receive, send)
        # 其功能与router的__call__部分功能相似, 可以将路径与自身进行匹配

Route类

class Route(BaseRoute):
    """
    单个Route表示一个路由节点, 连接endpoint与路径
    参数:
        (1) path: 匹配路径
        (2) endpoint: 路径处理的函数
        (3) methods: 允许的HTTP方法
        (4) name: 用于反向查找(?)
        (5) include_in_schema: 未知(?)
    """
    def __init__(
            self,
            path: str,
            endpoint: typing.Callable,
            *,
            methods: typing.List[str] = None,
            name: str = None,
            include_in_schema: bool = True,
    ) -> None:
        assert path.startswith("/"), "Routed paths must start with '/'"
        self.path = path
        self.endpoint = endpoint
        self.name = get_name(endpoint) if name is None else name
        # 通过endpoint的函数名(__name__)获取,源码略
        self.include_in_schema = include_in_schema

        if inspect.isfunction(endpoint) or inspect.ismethod(endpoint):
            # Endpoint 是一个函数或者方法 把它看作 `func(request) -> response`.
            self.app = request_response(endpoint)
            # 将endpoint进行封装
            if methods is None:
                methods = ["GET"]
        else:
            # Endpoint 是一个类(cbv). 把它看作 ASGI 应用.
            self.app = endpoint

        if methods is None:
            self.methods = None
        else:
            self.methods = set(method.upper() for method in methods)
            if "GET" in self.methods:
                self.methods.add("HEAD")
        # 设置允许的HTTP方法
        self.path_regex, self.path_format, self.param_convertors = compile_path(path)
        # 分解路径字符串

routing.py→request_response

用于将endpoint封装成app
app传入元数据,直接得到response

def request_response(func: typing.Callable) -> ASGIApp:
    """
    获取一个函数或者协程 `func(request) -> response`,
    然后返回一个 ASGI 应用
    """
    is_coroutine = asyncio.iscoroutinefunction(func)

    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive=receive, send=send)
        # 构建一个request实例
        if is_coroutine:
            response = await func(request)
        else:
            response = await run_in_threadpool(func, request)
        # 如果是协程则执行, 如果是普通函数则加入线程池
        # 这里的func, 即为我们最常写的路由对应的endpoint(请求处理函数)
        # 函数在此被执行, 得到response
        await response(scope, receive, send)

    return app

routing.py→compile_path

# 匹配URL路径中的params参数, 例. '{param}', 或 '{param:int}'
PARAM_REGEX = re.compile("{([a-zA-Z_][a-zA-Z0-9_]*)(:[a-zA-Z_][a-zA-Z0-9_]*)?}")

def compile_path(
        path: str,
) -> typing.Tuple[typing.Pattern, str, typing.Dict[str, Convertor]]:
    """
    提交一个设定的路径参数匹配字符串, 类似"/{username:str}",
    返回一个三元元祖(regex, format, {param_name:convertor}).

    regex:      "/(?P<username>[^/]+)"
    format:     "/{username}"
    convertors: {"username": StringConvertor()}
    """
    path_regex = "^"
    path_format = ""

    idx = 0
    param_convertors = {}
    for match in PARAM_REGEX.finditer(path):
        param_name, convertor_type = match.groups("str")
        # 解析出参数名, 和参数类型
        convertor_type = convertor_type.lstrip(":")
        assert (
                convertor_type in CONVERTOR_TYPES
        ), f"Unknown path convertor '{convertor_type}'"
        # 查询是否在类型字典中
        convertor = CONVERTOR_TYPES[convertor_type]
        # 从类型字典得到对应的参数数据类型转换器

        path_regex += re.escape(path[idx: match.start()])
        # 解析出其中的正则
        path_regex += f"(?P<{param_name}>{convertor.regex})"
        # 正则字符串中加入分组
        path_format += path[idx: match.start()]
        path_format += "{%s}" % param_name

        param_convertors[param_name] = convertor

        idx = match.end()

    path_regex += re.escape(path[idx:]) + "$"
    path_format += path[idx:]

    return re.compile(path_regex), path_format, param_convertors

回到Route类

    def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
        """
        路由节点的匹配方法, 接收scope字典, 进行解析
        返回 match, child_scope
        """
        if scope["type"] == "http":
            match = self.path_regex.match(scope["path"])
            # 将传过来的path与自身路径正则进行匹配
            if match:
                matched_params = match.groupdict()
                for key, value in matched_params.items():
                    matched_params[key] = self.param_convertors[key].convert(value)
                # 调用对应的转换器
                path_params = dict(scope.get("path_params", {}))
                path_params.update(matched_params)
                child_scope = {"endpoint": self.endpoint, "path_params": path_params}
                if self.methods and scope["method"] not in self.methods:
                    # 如果请求的方法,没在自身允许的方法中则返回部分匹配
                    return Match.PARTIAL, child_scope
                else:
                    # 否则返回全部匹配
                    return Match.FULL, child_scope
        # 未匹配
        return Match.NONE, {}

    def url_path_for(self, name: str, **path_params: str) -> URLPath:
        # 同url_for一样,用于反向查找
        seen_params = set(path_params.keys())
        expected_params = set(self.param_convertors.keys())

        if name != self.name or seen_params != expected_params:
            raise NoMatchFound()

        path, remaining_params = replace_params(
            self.path_format, self.param_convertors, path_params
        )
        assert not remaining_params
        return URLPath(path=path, protocol="http")

    async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        节点的入口
        当route匹配成功,就会调用匹配成功的handle方法
        """
        if self.methods and scope["method"] not in self.methods:
            if "app" in scope:
                raise HTTPException(status_code=405)
            else:
                response = PlainTextResponse("Method Not Allowed", status_code=405)
            await response(scope, receive, send)
        else:
            # 当判断属于支持的方法时,调用封装好的endpoint
            await self.app(scope, receive, send)

    def __eq__(self, other: typing.Any) -> bool:
        return (
                isinstance(other, Route)
                and self.path == other.path
                and self.endpoint == other.endpoint
                and self.methods == other.methods
        )

routing.py余下内容将在下篇继续解读

上一篇 下一篇

猜你喜欢

热点阅读