Starlette 解读 by Gascognya

Starlette 源码阅读 (五) response

2020-08-13  本文已影响0人  Gascognya

response.py

提供了多种response模型, 都是由Response类衍生而来, 逻辑都比较简单.


response.py

Response类

class Response:
    media_type = None
    charset = "utf-8"

    def __init__(
        self,
        content: typing.Any = None,
        status_code: int = 200,
        headers: dict = None,
        media_type: str = None,
        background: BackgroundTask = None,
    ) -> None:
        self.status_code = status_code
        if media_type is not None:
            self.media_type = media_type
        self.background = background

        # body和headers经过了加工渲染
        self.body = self.render(content)
        self.init_headers(headers)

    def render(self, content: typing.Any) -> bytes:
        # 将content转换为字节码
        if content is None:
            return b""
        if isinstance(content, bytes):
            return content
        return content.encode(self.charset)

    def init_headers(self, headers: typing.Mapping[str, str] = None) -> None:
        if headers is None:
            raw_headers = []  # type: typing.List[typing.Tuple[bytes, bytes]]
            populate_content_length = True
            populate_content_type = True
        else:
            raw_headers = [
                (k.lower().encode("latin-1"), v.encode("latin-1"))
                for k, v in headers.items()
                # [(key, value), (key, value), ...]
            ]
            keys = [h[0] for h in raw_headers]
            populate_content_length = b"content-length" not in keys
            populate_content_type = b"content-type" not in keys
            # 不在为true, 在为false
        body = getattr(self, "body", b"")
        if body and populate_content_length:
            # 如果body不为空, 且content-length不存在
            content_length = str(len(body))
            raw_headers.append((b"content-length", content_length.encode("latin-1")))
            # 计算body的长度, 添加到headers中

        content_type = self.media_type
        if content_type is not None and populate_content_type:
            # 如果content_type不为空, 且populate_content_type不存在
            if content_type.startswith("text/"):
                content_type += "; charset=" + self.charset
            raw_headers.append((b"content-type", content_type.encode("latin-1")))
            # 添加到headers中
        self.raw_headers = raw_headers
        # 得到headers元数据格式

    @property
    def headers(self) -> MutableHeaders:
        if not hasattr(self, "_headers"):
            self._headers = MutableHeaders(raw=self.raw_headers)
        return self._headers
    def set_cookie(
        self,
        key: str,
        value: str = "",
        max_age: int = None,
        expires: int = None,
        path: str = "/",
        domain: str = None,
        secure: bool = False,
        httponly: bool = False,
        samesite: str = "lax",
    ) -> None:
        cookie = http.cookies.SimpleCookie()  # type: http.cookies.BaseCookie
        cookie[key] = value
        if max_age is not None:
            cookie[key]["max-age"] = max_age
        if expires is not None:
            cookie[key]["expires"] = expires
        if path is not None:
            cookie[key]["path"] = path
        if domain is not None:
            cookie[key]["domain"] = domain
        if secure:
            cookie[key]["secure"] = True
        if httponly:
            cookie[key]["httponly"] = True
        if samesite is not None:
            assert samesite.lower() in [
                "strict",
                "lax",
                "none",
            ], "samesite must be either 'strict', 'lax' or 'none'"
            cookie[key]["samesite"] = samesite
        cookie_val = cookie.output(header="").strip()
        # 输出cookie字符串
        self.raw_headers.append((b"set-cookie", cookie_val.encode("latin-1")))

    def delete_cookie(self, key: str, path: str = "/", domain: str = None) -> None:
        self.set_cookie(key, expires=0, max_age=0, path=path, domain=domain)

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )
        await send({"type": "http.response.body", "body": self.body})
        # 当被调用时, 发送headers和body
        if self.background is not None:
            await self.background()
        # 如果有后台任务, 执行后台任务

HTMLResponse&PlanTextResponse

仅改变了media type

class HTMLResponse(Response):
    media_type = "text/html"

class PlainTextResponse(Response):
    media_type = "text/plain"

关于JSON的Response

提供了两种JSONResponseUJSONResponse
他们都修改了media type并且重写了render方法
两者区别主要在使用了json包还是ujson

json与ujson的性能对比 - 简书
ujson性能比json更高该文章中的例子速度达到了2-4倍

class JSONResponse(Response):
    media_type = "application/json"
    
    def render(self, content: typing.Any) -> bytes:
        # 渲染成json再转成字节码
        return json.dumps(
            content,
            ensure_ascii=False,
            allow_nan=False,
            indent=None,
            separators=(",", ":"),
        ).encode("utf-8")

class UJSONResponse(JSONResponse):
    media_type = "application/json"
    
    def render(self, content: typing.Any) -> bytes:
        assert ujson is not None, "ujson must be installed to use UJSONResponse"
        return ujson.dumps(content, ensure_ascii=False).encode("utf-8")

RedirectResponse

重定向

class RedirectResponse(Response):
    def __init__(
        self,
        url: typing.Union[str, URL],
        status_code: int = 307,
        headers: dict = None,
        background: BackgroundTask = None,
    ) -> None:
        super().__init__(
            content=b"", status_code=status_code, headers=headers, background=background
        )
        self.headers["location"] = quote_plus(str(url), safe=":/%#?&=@[]!$&'()*+,;")
        # 将要跳转的url编码加入到location

StreamingResponse

接收数据流的Response, 推测其功能是提供一个协程. 在发送数据时, 从数据流协程中获取数据包, 将数据包发送出去. 这样循环往复. 作用其一推测可以在endpoint结束后, 再获取要发送的数据. 并且可以添加后台任务

class StreamingResponse(Response):
    """
    将content设置为一个协程, 再发送前接受协程的数据流
    """
    def __init__(
        self,
        content: typing.Any,
        status_code: int = 200,
        headers: dict = None,
        media_type: str = None,
        background: BackgroundTask = None,
    ) -> None:
        if inspect.isasyncgen(content):
            self.body_iterator = content
        else:
            self.body_iterator = iterate_in_threadpool(content)
        # 协程直接加入, 非协程加入线程池
        self.status_code = status_code
        self.media_type = self.media_type if media_type is None else media_type
        self.background = background
        self.init_headers(headers)

    async def listen_for_disconnect(self, receive: Receive) -> None:
        while True:
            message = await receive()
            if message["type"] == "http.disconnect":
                break
        # 监听receive, 如果收到断开连接, 结束函数
    async def stream_response(self, send: Send) -> None:
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )
        async for chunk in self.body_iterator:
            if not isinstance(chunk, bytes):
                chunk = chunk.encode(self.charset)
            await send({"type": "http.response.body", "body": chunk, "more_body": True})
            # 挨个发送数据包, 如果不是字节码形式, 则编码
        await send({"type": "http.response.body", "body": b"", "more_body": False})
        # 发送结束flag

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        await run_until_first_complete(
            (self.stream_response, {"send": send}),
            (self.listen_for_disconnect, {"receive": receive}),
        )
        # 直到 发送完成 或者 断开连接
        if self.background is not None:
            await self.background()
        # 执行后台任务

FileResponse

class FileResponse(Response):
    chunk_size = 4096

    def __init__(
        self,
        path: str,
        status_code: int = 200,
        headers: dict = None,
        media_type: str = None,
        background: BackgroundTask = None,
        filename: str = None,
        stat_result: os.stat_result = None,
        method: str = None,
    ) -> None:
        assert aiofiles is not None, "'aiofiles' must be installed to use FileResponse"
        self.path = path
        self.status_code = status_code
        self.filename = filename
        self.send_header_only = method is not None and method.upper() == "HEAD"
        if media_type is None:
            media_type = guess_type(filename or path)[0] or "text/plain"
        self.media_type = media_type
        self.background = background
        self.init_headers(headers)
        if self.filename is not None:
            content_disposition_filename = quote(self.filename)
            if content_disposition_filename != self.filename:
                content_disposition = "attachment; filename*=utf-8''{}".format(
                    content_disposition_filename
                )
            else:
                content_disposition = 'attachment; filename="{}"'.format(self.filename)
            # 设置文件名的编码
            self.headers.setdefault("content-disposition", content_disposition)
        self.stat_result = stat_result
        if stat_result is not None:
            self.set_stat_headers(stat_result)
        # 属性

    def set_stat_headers(self, stat_result: os.stat_result) -> None:
        content_length = str(stat_result.st_size)
        last_modified = formatdate(stat_result.st_mtime, usegmt=True)
        etag_base = str(stat_result.st_mtime) + "-" + str(stat_result.st_size)
        etag = hashlib.md5(etag_base.encode()).hexdigest()

        self.headers.setdefault("content-length", content_length)
        self.headers.setdefault("last-modified", last_modified)
        self.headers.setdefault("etag", etag)
        # 将熟悉内容全部转入自身headers

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if self.stat_result is None:
            try:
                stat_result = await aio_stat(self.path)
                self.set_stat_headers(stat_result)
                # 如果属性变量存在, 则录入
            except FileNotFoundError:
                raise RuntimeError(f"File at path {self.path} does not exist.")
            else:
                mode = stat_result.st_mode
                if not stat.S_ISREG(mode):
                    raise RuntimeError(f"File at path {self.path} is not a file.")
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )
        # 发送header
        if self.send_header_only:
            await send({"type": "http.response.body", "body": b"", "more_body": False})
            # 是否仅发送header
        else:
            async with aiofiles.open(self.path, mode="rb") as file:
                # 打开文件
                more_body = True
                while more_body:
                    chunk = await file.read(self.chunk_size)
                    more_body = len(chunk) == self.chunk_size
                    # 如果长度相等, 说明还有更多数据块
                    # 其实我比较好奇, 如果恰好, 该怎么办
                    # 可能损失是多发了一个包
                    await send(
                        {
                            "type": "http.response.body",
                            "body": chunk,
                            "more_body": more_body,
                        }
                    )
        if self.background is not None:
            await self.background()
        # 后台

responses和requests两个模块都属于功能性的不具有多少业务逻辑,至此从app应用到endpoint这一段的大体流程,已经有初步印象了,下篇文章将对这前一半程进行总结归纳。然后继续返程的业务解读。

上一篇下一篇

猜你喜欢

热点阅读