技术文章Uvicorn 源码解读程序员

Uvicorn 基础工作原理

2020-08-21  本文已影响0人  Gascognya

Uvicorn运行时,分为两大部分。

在上一章 Asyncio 协议Protocol 与 传输Transport 中我们提到了关于Asyncio提供的网络服务器。

服务器启动

main.py → run()是uvicorn的启动入口,他可以从uvicorn.run调用启动,也可以从命令行启动。
之前的一篇 Uvicorn 基本了解介绍了关于如何启动Uvicorn。
main.py → run()会创建若干个class Server的实例,通常为一个,代表Uvicorn服务器自身。

Server会进行初始化,例如构建前半的中间件堆栈(starlette中的我们可以称之为后半)。初始化完成后,服务器会在main_loop()中不断循环来维护自身状态。
在初始化阶段,Server会构建Asyncio的网络服务器。
流程如下:

config.py → Config类 → load
if isinstance(self.http, str):
    # 设置HTTP协议实现。httptools实现提供了更高的性能,但与PyPy不兼容,并且需要在Windows上进行编译。
    # 选项: “ auto”,“ h11”,“ httptools”。 默认值: 'auto'。
    self.http_protocol_class = import_from_string(HTTP_PROTOCOLS[self.http])
config.py → HTTP_PROTOCOLS
HTTP_PROTOCOLS = {
    "auto": "uvicorn.protocols.http.auto:AutoHTTPProtocol",
    "h11": "uvicorn.protocols.http.h11_impl:H11Protocol",
    "httptools": "uvicorn.protocols.http.httptools_impl:HttpToolsProtocol",
}
main.py → Server类 → startup
create_protocol = functools.partial(
            config.http_protocol_class, config=config, server_state=self.server_state
        )
server = await loop.create_server(
    create_protocol,
    host=config.host,
    port=config.port,
    ssl=config.ssl,
    backlog=config.backlog,
)

到此为止,两大部分就都构建完成了,进入运行时状态

H11Protocol

class H11Protocol(asyncio.Protocol)是由uvicorn提供的一个基于H11协议库实现的Protocol,它是整个uvicorn通信的核心(如果选用httptools库就是HttpToolsProtocol)

H11Protocol
可以看到他实现了Protocol的接口,socket接收到的http报文会被传递至data_received(),这样,数据的传入就完成了。
    def data_received(self, data):
        if self.timeout_keep_alive_task is not None:
            self.timeout_keep_alive_task.cancel()
            self.timeout_keep_alive_task = None

        self.conn.receive_data(data)
        # 引用: self.conn = h11.Connection(h11.SERVER)
        # 调用h11,此步仅将数据加入接受缓冲区
        
        self.handle_events()
        # 开始处理

可以看到,接收到数据,首先是将数据加入到缓冲区。然后再进行处理。

def handle_events(self):
                event = self.conn.next_event()

                ......

                self.scope = {
                    "type": "http",
                    "asgi": {
                        "version": self.config.asgi_version,
                        "spec_version": "2.1",
                    },
                    "http_version": event.http_version.decode("ascii"),
                    "server": self.server,
                    "client": self.client,
                    "scheme": self.scheme,
                    "method": event.method.decode("ascii"),
                    "root_path": self.root_path,
                    "path": unquote(raw_path.decode("ascii")),
                    "raw_path": raw_path,
                    "query_string": query_string,
                    "headers": self.headers,
                }

                ......

                self.cycle = RequestResponseCycle(
                    scope=self.scope,
                    conn=self.conn,
                    transport=self.transport,
                    flow=self.flow,
                    logger=self.logger,
                    access_logger=self.access_logger,
                    access_log=self.access_log,
                    default_headers=self.default_headers,
                    message_event=self.message_event,
                    on_response=self.on_response_complete,
                )
                task = self.loop.create_task(self.cycle.run_asgi(app))

RequestResponseCycle

RequestResponseCycle

我们可以看到其具有sendreceive两个方法。让人感到很熟悉。

    async def run_asgi(self, app):
            result = await app(self.scope, self.receive, self.send)
            ......

这里的app,是指uvicorn构建的中间件堆栈,我们切回Config

    def load(self):

        ......

        # 开始中间件堆栈,和starlette中的构建方法完全一致
        # 最内层为wsgi/asgi中间件,其app属性指向传进来的app(starlette, fastapi等实例)
        # 实际上就是将两段app链相连接
        if self.interface == "wsgi":
            self.loaded_app = WSGIMiddleware(self.loaded_app)
            self.ws_protocol_class = None
        elif self.interface == "asgi2":
            self.loaded_app = ASGI2Middleware(self.loaded_app)
        if self.debug:
            self.loaded_app = DebugMiddleware(self.loaded_app)
        if logger.level <= TRACE_LOG_LEVEL:
            self.loaded_app = MessageLoggerMiddleware(self.loaded_app)
        if self.proxy_headers:
            self.loaded_app = ProxyHeadersMiddleware(
                self.loaded_app, trusted_hosts=self.forwarded_allow_ips
            )

起初这个loaded_appstarlette的实例。
最终构成了ProxyHeadersMiddleware为起点,endpointApp为终点的App链(中间件堆栈包含于其中)
Uvicorn中间件堆栈Starlette实例Starlette中间件堆栈Routerendpoint/cbv
这就是整个服务器的全貌
我们来看一send和receive方法

Send

    # ASGI 接口
    async def send(self, message):
        message_type = message["type"]

        if self.flow.write_paused and not self.disconnected:
            await self.flow.drain()

        if self.disconnected:
            return

        # send函数,分为两个阶段,三次发送
        # 第一次发送type = http.response.start,发送request头
        # 第二次发送type = http.response.body,发送body
        # 然后查看是否还要more body,没有则发送b""
        if not self.response_started:
            # 发送response 状态行和 headers
            if message_type != "http.response.start":
                msg = "Expected ASGI message 'http.response.start', but got '%s'."
                raise RuntimeError(msg % message_type)

            self.response_started = True
            self.waiting_for_100_continue = False

            status_code = message["status"]
            headers = self.default_headers + message.get("headers", [])

            if self.access_log:
                self.access_logger.info(
                    '%s - "%s %s HTTP/%s" %d',
                    get_client_addr(self.scope),
                    self.scope["method"],
                    get_path_with_query_string(self.scope),
                    self.scope["http_version"],
                    status_code,
                    extra={"status_code": status_code, "scope": self.scope},
                )

            # 写入response状态行和 headers
            reason = STATUS_PHRASES[status_code]
            event = h11.Response(
                status_code=status_code, headers=headers, reason=reason
            )
            output = self.conn.send(event)
            self.transport.write(output)

        elif not self.response_complete:
            # 发送 response body
            if message_type != "http.response.body":
                msg = "Expected ASGI message 'http.response.body', but got '%s'."
                raise RuntimeError(msg % message_type)

            body = message.get("body", b"")
            more_body = message.get("more_body", False)

            # 写入 response body
            if self.scope["method"] == "HEAD":
                event = h11.Data(data=b"")
            else:
                event = h11.Data(data=body)
            output = self.conn.send(event)
            self.transport.write(output)

            # 处理response完成
            if not more_body:
                # body已全部发送完毕
                self.response_complete = True
                event = h11.EndOfMessage()
                output = self.conn.send(event)
                self.transport.write(output)
                # 第三次发送,发送b""表示结束
        else:
            # Response 已经发送
            msg = "Unexpected ASGI message '%s' sent, after response already completed."
            raise RuntimeError(msg % message_type)

        if self.response_complete:
            if self.conn.our_state is h11.MUST_CLOSE or not self.keep_alive:
                event = h11.ConnectionClosed()
                self.conn.send(event)
                self.transport.close()
            self.on_response()

send中,包含了大量的服务器状态切换,其逻辑极为复杂。尤其是关于服务器的状态机方面,让人头昏眼花。
在发送的时候,报文分至少三次发送:

receive

    async def receive(self):
        if self.waiting_for_100_continue and not self.transport.is_closing():
            event = h11.InformationalResponse(
                status_code=100, headers=[], reason="Continue"
            )
            output = self.conn.send(event)
            self.transport.write(output)
            self.waiting_for_100_continue = False

        if not self.disconnected and not self.response_complete:
            self.flow.resume_reading()
            await self.message_event.wait()
            self.message_event.clear()

        if self.disconnected or self.response_complete:
            message = {"type": "http.disconnect"}
        else:
            message = {
                "type": "http.request",
                "body": self.body,
                "more_body": self.more_body,
            }
            self.body = b""

        return message

receive的使用比较少,在传统http请求中基本不需要,主要是用于websocket,在此不多做介绍。

到此为止,在starlette中看到的send,receive和scope三者的来源,我们已经搞清了。

关于Uvicorn的更深一步探索,我们将将在未来进行。目前的准备还不够充分,贸然进行容易出现错误。

上一篇下一篇

猜你喜欢

热点阅读