Uvicorn 基础工作原理
Uvicorn运行时,分为两大部分。
- 一个是uvicorn本身的循环,用于服务器自身状态的维护,不参与通信。
- 另一个是由uvicorn建立的Asyncio的服务器,提交由h11或httptools协议库实现的Protocol。
在上一章 Asyncio 协议Protocol 与 传输Transport 中我们提到了关于Asyncio提供的网络服务器。
服务器启动
main.py → run()
是uvicorn的启动入口,他可以从uvicorn.run调用启动,也可以从命令行启动。
之前的一篇 Uvicorn 基本了解介绍了关于如何启动Uvicorn。
main.py → run()
会创建若干个class Server
的实例,通常为一个,代表Uvicorn服务器自身。
Server
会进行初始化,例如构建前半的中间件堆栈(starlette中的我们可以称之为后半)。初始化完成后,服务器会在main_loop()中不断循环来维护自身状态。
在初始化阶段,Server会构建Asyncio的网络服务器。
流程如下:
config.py → Config类 → load
if isinstance(self.http, str):
# 设置HTTP协议实现。httptools实现提供了更高的性能,但与PyPy不兼容,并且需要在Windows上进行编译。
# 选项: “ auto”,“ h11”,“ httptools”。 默认值: 'auto'。
self.http_protocol_class = import_from_string(HTTP_PROTOCOLS[self.http])
config.py → HTTP_PROTOCOLS
HTTP_PROTOCOLS = {
"auto": "uvicorn.protocols.http.auto:AutoHTTPProtocol",
"h11": "uvicorn.protocols.http.h11_impl:H11Protocol",
"httptools": "uvicorn.protocols.http.httptools_impl:HttpToolsProtocol",
}
main.py → Server类 → startup
create_protocol = functools.partial(
config.http_protocol_class, config=config, server_state=self.server_state
)
server = await loop.create_server(
create_protocol,
host=config.host,
port=config.port,
ssl=config.ssl,
backlog=config.backlog,
)
到此为止,两大部分就都构建完成了,进入运行时状态
H11Protocol
class H11Protocol(asyncio.Protocol)
是由uvicorn提供的一个基于H11协议库实现的Protocol,它是整个uvicorn通信的核心(如果选用httptools库就是HttpToolsProtocol)
可以看到他实现了Protocol的接口,socket接收到的http报文会被传递至data_received(),这样,数据的传入就完成了。
def data_received(self, data):
if self.timeout_keep_alive_task is not None:
self.timeout_keep_alive_task.cancel()
self.timeout_keep_alive_task = None
self.conn.receive_data(data)
# 引用: self.conn = h11.Connection(h11.SERVER)
# 调用h11,此步仅将数据加入接受缓冲区
self.handle_events()
# 开始处理
可以看到,接收到数据,首先是将数据加入到缓冲区。然后再进行处理。
def handle_events(self):
event = self.conn.next_event()
......
self.scope = {
"type": "http",
"asgi": {
"version": self.config.asgi_version,
"spec_version": "2.1",
},
"http_version": event.http_version.decode("ascii"),
"server": self.server,
"client": self.client,
"scheme": self.scheme,
"method": event.method.decode("ascii"),
"root_path": self.root_path,
"path": unquote(raw_path.decode("ascii")),
"raw_path": raw_path,
"query_string": query_string,
"headers": self.headers,
}
......
self.cycle = RequestResponseCycle(
scope=self.scope,
conn=self.conn,
transport=self.transport,
flow=self.flow,
logger=self.logger,
access_logger=self.access_logger,
access_log=self.access_log,
default_headers=self.default_headers,
message_event=self.message_event,
on_response=self.on_response_complete,
)
task = self.loop.create_task(self.cycle.run_asgi(app))
RequestResponseCycle
RequestResponseCycle我们可以看到其具有send
和receive
两个方法。让人感到很熟悉。
async def run_asgi(self, app):
result = await app(self.scope, self.receive, self.send)
......
这里的app,是指uvicorn构建的中间件堆栈,我们切回Config
类
def load(self):
......
# 开始中间件堆栈,和starlette中的构建方法完全一致
# 最内层为wsgi/asgi中间件,其app属性指向传进来的app(starlette, fastapi等实例)
# 实际上就是将两段app链相连接
if self.interface == "wsgi":
self.loaded_app = WSGIMiddleware(self.loaded_app)
self.ws_protocol_class = None
elif self.interface == "asgi2":
self.loaded_app = ASGI2Middleware(self.loaded_app)
if self.debug:
self.loaded_app = DebugMiddleware(self.loaded_app)
if logger.level <= TRACE_LOG_LEVEL:
self.loaded_app = MessageLoggerMiddleware(self.loaded_app)
if self.proxy_headers:
self.loaded_app = ProxyHeadersMiddleware(
self.loaded_app, trusted_hosts=self.forwarded_allow_ips
)
起初这个loaded_app
为starlette
的实例。
最终构成了ProxyHeadersMiddleware
为起点,endpointApp为终点的App链(中间件堆栈包含于其中)
Uvicorn中间件堆栈
→ Starlette实例
→ Starlette中间件堆栈
→ Router
→ endpoint/cbv
这就是整个服务器的全貌
我们来看一send和receive方法
Send
# ASGI 接口
async def send(self, message):
message_type = message["type"]
if self.flow.write_paused and not self.disconnected:
await self.flow.drain()
if self.disconnected:
return
# send函数,分为两个阶段,三次发送
# 第一次发送type = http.response.start,发送request头
# 第二次发送type = http.response.body,发送body
# 然后查看是否还要more body,没有则发送b""
if not self.response_started:
# 发送response 状态行和 headers
if message_type != "http.response.start":
msg = "Expected ASGI message 'http.response.start', but got '%s'."
raise RuntimeError(msg % message_type)
self.response_started = True
self.waiting_for_100_continue = False
status_code = message["status"]
headers = self.default_headers + message.get("headers", [])
if self.access_log:
self.access_logger.info(
'%s - "%s %s HTTP/%s" %d',
get_client_addr(self.scope),
self.scope["method"],
get_path_with_query_string(self.scope),
self.scope["http_version"],
status_code,
extra={"status_code": status_code, "scope": self.scope},
)
# 写入response状态行和 headers
reason = STATUS_PHRASES[status_code]
event = h11.Response(
status_code=status_code, headers=headers, reason=reason
)
output = self.conn.send(event)
self.transport.write(output)
elif not self.response_complete:
# 发送 response body
if message_type != "http.response.body":
msg = "Expected ASGI message 'http.response.body', but got '%s'."
raise RuntimeError(msg % message_type)
body = message.get("body", b"")
more_body = message.get("more_body", False)
# 写入 response body
if self.scope["method"] == "HEAD":
event = h11.Data(data=b"")
else:
event = h11.Data(data=body)
output = self.conn.send(event)
self.transport.write(output)
# 处理response完成
if not more_body:
# body已全部发送完毕
self.response_complete = True
event = h11.EndOfMessage()
output = self.conn.send(event)
self.transport.write(output)
# 第三次发送,发送b""表示结束
else:
# Response 已经发送
msg = "Unexpected ASGI message '%s' sent, after response already completed."
raise RuntimeError(msg % message_type)
if self.response_complete:
if self.conn.our_state is h11.MUST_CLOSE or not self.keep_alive:
event = h11.ConnectionClosed()
self.conn.send(event)
self.transport.close()
self.on_response()
send中,包含了大量的服务器状态切换,其逻辑极为复杂。尤其是关于服务器的状态机方面,让人头昏眼花。
在发送的时候,报文分至少三次发送:
- 第一次,发送Headers,type为
http.response.start
(在starlette中见过),发送完毕后self.response_started
置True
。 - 第二次,发送Body,此过程可能持续多次。type为
http.response.body
,如果没有更多的body数据了,self.response_complete
置True
- 第三次,按照HTTP协议,发送b' '空字节,表示发送完毕。整个发送过程完成。
receive
async def receive(self):
if self.waiting_for_100_continue and not self.transport.is_closing():
event = h11.InformationalResponse(
status_code=100, headers=[], reason="Continue"
)
output = self.conn.send(event)
self.transport.write(output)
self.waiting_for_100_continue = False
if not self.disconnected and not self.response_complete:
self.flow.resume_reading()
await self.message_event.wait()
self.message_event.clear()
if self.disconnected or self.response_complete:
message = {"type": "http.disconnect"}
else:
message = {
"type": "http.request",
"body": self.body,
"more_body": self.more_body,
}
self.body = b""
return message
receive的使用比较少,在传统http请求中基本不需要,主要是用于websocket,在此不多做介绍。
到此为止,在starlette中看到的send,receive和scope三者的来源,我们已经搞清了。
关于Uvicorn的更深一步探索,我们将将在未来进行。目前的准备还不够充分,贸然进行容易出现错误。