Starlette 解读 by Gascognya

Starlette 源码阅读 (七) endpoint

2020-08-15  本文已影响0人  Gascognya

endpoints.py

关于cbv的源码,分http的cbv和ws的cbv两种

HTTPEndpoint类

class HTTPEndpoint:
    """
    cbv,老朋友了。本身属于一个app。
    可以看到原理是极其简单的
    """
    def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
        assert scope["type"] == "http"
        self.scope = scope
        self.receive = receive
        self.send = send

    def __await__(self) -> typing.Generator:
        return self.dispatch().__await__()

    async def dispatch(self) -> None:
        # 分发请求
        # 通常cbv会重写get post等方法,而请求传入到cbv实例中,究竟该调用哪个方法。
        # 会在此进行派送
        request = Request(self.scope, receive=self.receive)
        handler_name = "get" if request.method == "HEAD" else request.method.lower()
        # 如果是head方法就转换成get方法
        handler = getattr(self, handler_name, self.method_not_allowed)
        # 用getattr的方式找同名方法
        is_async = asyncio.iscoroutinefunction(handler)
        # 判断是否为协程,寻找相应调用方式
        if is_async:
            response = await handler(request)
        else:
            response = await run_in_threadpool(handler, request)
        await response(self.scope, self.receive, self.send)

    async def method_not_allowed(self, request: Request) -> Response:
        # 如果我们在starlette应用中运行, 且抛出了一个错误
        # 那么可配置的异常处理程序, 便可处理返回的response并触发一场
        # 对于简单的ASGI程序来说, 只仅仅返回response即可
        # routing.py里完全一致的原文,直接把翻译超过来了
        if "app" in self.scope:
            raise HTTPException(status_code=405)
        return PlainTextResponse("Method Not Allowed", status_code=405)
        # 判断报错方式

WebSocketEndpoint类

class WebSocketEndpoint:

    encoding: typing.Optional[str] = None  # 可能是 "text", "bytes", 或者 "json".

    def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
        assert scope["type"] == "websocket"
        self.scope = scope
        self.receive = receive
        self.send = send

    def __await__(self) -> typing.Generator:
        return self.dispatch().__await__()

    async def dispatch(self) -> None:
        websocket = WebSocket(self.scope, receive=self.receive, send=self.send)
        await self.on_connect(websocket)
        # 执行重写的on_connect方法
        # 建立ws连接
        close_code = status.WS_1000_NORMAL_CLOSURE

        try:
            while True:
                # 不断接受信息,直到断开连接
                message = await websocket.receive()
                if message["type"] == "websocket.receive":
                    data = await self.decode(websocket, message)
                    await self.on_receive(websocket, data)
                    # 执行重写的on_receive方法
                elif message["type"] == "websocket.disconnect":
                    close_code = int(message.get("code", status.WS_1000_NORMAL_CLOSURE))
                    break
        except Exception as exc:
            close_code = status.WS_1011_INTERNAL_ERROR
            raise exc from None
        finally:
            # 将发生异常或者断开连接的状态码,传给重写的on_disconnect方法
            await self.on_disconnect(websocket, close_code)

    async def decode(self, websocket: WebSocket, message: Message) -> typing.Any:

        if self.encoding == "text":
            if "text" not in message:
                await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA)
                raise RuntimeError("Expected text websocket messages, but got bytes")
            return message["text"]

        elif self.encoding == "bytes":
            if "bytes" not in message:
                await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA)
                raise RuntimeError("Expected bytes websocket messages, but got text")
            return message["bytes"]

        elif self.encoding == "json":
            if message.get("text") is not None:
                text = message["text"]
            else:
                text = message["bytes"].decode("utf-8")

            try:
                return json.loads(text)
            except json.decoder.JSONDecodeError:
                await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA)
                raise RuntimeError("Malformed JSON data received.")

        assert (
            self.encoding is None
        ), f"Unsupported 'encoding' attribute {self.encoding}"
        return message["text"] if message.get("text") else message["bytes"]

    async def on_connect(self, websocket: WebSocket) -> None:
        """重写当websocket连接的方法"""
        await websocket.accept()

    async def on_receive(self, websocket: WebSocket, data: typing.Any) -> None:
        """重写当websocket收到消息的方法"""

    async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
        """重写当websocket断开连接的方法"""

做一个小统计

行数统计
做了下统计,applications.py 141,endpoints.py 88,requests.py 206,responses.py 275,routing.py 516,websockets.py 118
现在已经过了1344行,源码总共有4157行,还任重而道远啊。下一章决定啃最大一块datastructures.py。作为经验尚浅的初学者,对python高深境界的了解还远远不足。尚待努力。
上一篇 下一篇

猜你喜欢

热点阅读