FastAPI 解读 by Gascognya

FastAPI 依赖注入详解:处理依赖树

2020-10-26  本文已影响0人  Gascognya
    async def app(request: Request) -> Response:

        ......

        solved_result = await solve_dependencies(
            request=request,
            dependant=dependant,
            body=body,
            dependency_overrides_provider=dependency_overrides_provider,
        )

        values, errors, background_tasks, sub_response, _ = solved_result

        if errors:
            raise RequestValidationError(errors, body=body)
        else:
            raw_response = await run_endpoint_function(
                dependant=dependant, values=values, is_coroutine=is_coroutine
            )

        ......

这里是endpoint的前一步,request首先要经过solve_dependencies()来与endpoint的依赖树进行匹配,这相当于API的门卫一般的存在。
匹配的结果包含在solve过程中的结果,错误,以及其他若干信息。有了详细的信息,我们便能精确的定位到问题的所在。

async def solve_dependencies(
        *,
        request: Union[Request, WebSocket],
        dependant: Dependant,
        body: Optional[Union[Dict[str, Any], FormData]] = None,
        background_tasks: Optional[BackgroundTasks] = None,
        response: Optional[Response] = None,
        dependency_overrides_provider: Optional[Any] = None,
        dependency_cache: Optional[Dict[Tuple[Callable, Tuple[str]], Any]] = None,
) -> Tuple[
    Dict[str, Any],
    List[ErrorWrapper],
    Optional[BackgroundTasks],
    Response,
    Dict[Tuple[Callable, Tuple[str]], Any],
]:
    """

    :param request: 请求报文
    :param dependant: endpoint对应的依赖树
    :param body: 请求体
    :param background_tasks: 后台任务
    :param response: 子依赖
    :param dependency_overrides_provider: app中设置的依赖替代项
    :param dependency_cache: 已完成的依赖
    :return:
    """
    values: Dict[str, Any] = {}
    errors: List[ErrorWrapper] = []
    response = response or Response(
        content=None,
        status_code=None,  # type: ignore
        headers=None,
        media_type=None,
        background=None,
    )
    dependency_cache = dependency_cache or {}

开始解依赖树

    for sub_dependant in dependant.dependencies:
        sub_dependant.call = cast(Callable, sub_dependant.call)
        sub_dependant.cache_key = cast(
            Tuple[Callable, Tuple[str]], sub_dependant.cache_key
        )
        # cast的作用是标注类型,方便提示

        call = sub_dependant.call
        use_sub_dependant = sub_dependant
        # 分别拿到依赖内容和依赖项

        if (
                dependency_overrides_provider
                and dependency_overrides_provider.dependency_overrides
        ):
            # 依赖重写时
            original_call = sub_dependant.call
            call = getattr(
                dependency_overrides_provider, "dependency_overrides", {}
            ).get(original_call, original_call)
            # 找到对应的重写
            use_path: str = sub_dependant.path  # type: ignore
            use_sub_dependant = get_dependant(
                path=use_path,
                call=call,
                name=sub_dependant.name,
                security_scopes=sub_dependant.security_scopes,
            )
            # 重新生成依赖
            use_sub_dependant.security_scopes = sub_dependant.security_scopes

        solved_result = await solve_dependencies(
            request=request,
            dependant=use_sub_dependant,
            body=body,
            background_tasks=background_tasks,
            response=response,
            dependency_overrides_provider=dependency_overrides_provider,
            dependency_cache=dependency_cache,
        )
        # 获得依赖项的子依赖的结果集

        (
            sub_values,
            sub_errors,
            background_tasks,
            _,  # 子依赖项返回与我们相同的响应
            sub_dependency_cache,
        ) = solved_result
        # 拿到结果和错误

上面这部分负责解决子依赖,拿到最后的结果集,然后用子依赖的结果集来解决自身

        dependency_cache.update(sub_dependency_cache)
        # 将子依赖已解决的内容注册
        if sub_errors:
            errors.extend(sub_errors)
            continue

        if sub_dependant.use_cache and sub_dependant.cache_key in dependency_cache:
            # 如果设置了使用缓存,且该依赖内容已被子依赖解决过
            solved = dependency_cache[sub_dependant.cache_key]
            # 直接抄答案

        # 否则照常执行
        elif is_gen_callable(call) or is_async_gen_callable(call):
            stack = request.scope.get("fastapi_astack")
            if stack is None:
                raise RuntimeError(
                    async_contextmanager_dependencies_error
                )  # pragma: no cover
            solved = await solve_generator(
                call=call, stack=stack, sub_values=sub_values
            )
            # 用子依赖得到的结果集,作为参数,传入到依赖内容中。

        elif is_coroutine_callable(call):
            solved = await call(**sub_values)
        else:
            solved = await run_in_threadpool(call, **sub_values)

        # 对于同步异步不同的执行方式

        if sub_dependant.name is not None:
            values[sub_dependant.name] = solved
            # 收集结果
        if sub_dependant.cache_key not in dependency_cache:
            dependency_cache[sub_dependant.cache_key] = solved
            # 将结果添加到结果集

解决该节点的解决每一个依赖项

    # 依赖项不再有子依赖时,会直接跳过上面的for循环
    path_values, path_errors = request_params_to_args(
        dependant.path_params, request.path_params
    )
    query_values, query_errors = request_params_to_args(
        dependant.query_params, request.query_params
    )
    header_values, header_errors = request_params_to_args(
        dependant.header_params, request.headers
    )
    cookie_values, cookie_errors = request_params_to_args(
        dependant.cookie_params, request.cookies
    )
    # 生成依赖树时,我们将需要的参数,保存到了path_params,query_params等地方。
    # 现在就是从Request中提取它们的好时机
    # 当然这个过程中也会产生错误,我们会收集这些错误
    values.update(path_values)
    values.update(query_values)
    values.update(header_values)
    values.update(cookie_values)
    errors += path_errors + query_errors + header_errors + cookie_errors
    # 合并现有错误

解决参数依赖,分别遍历依赖的个参数需求列表,与request所能提供的做匹配。
我们来看一下匹配的函数

def request_params_to_args(
        required_params: Sequence[ModelField],
        received_params: Union[Mapping[str, Any], QueryParams, Headers],
) -> Tuple[Dict[str, Any], List[ErrorWrapper]]:
    values = {}
    errors = []
    for field in required_params:
        if is_scalar_sequence_field(field) and isinstance(
                received_params, (QueryParams, Headers)
        ):
            value = received_params.getlist(field.alias) or field.default
        else:
            value = received_params.get(field.alias)
        # 分别对标准序列参数和其他参数的情况进行处理,拿到value
        # 这里未处理默认值的情况下

        field_info = field.field_info
        assert isinstance(
            field_info, params.Param
        ), "Params must be subclasses of Param"
        if value is None:
            if field.required:
                errors.append(
                    ErrorWrapper(
                        MissingError(), loc=(field_info.in_.value, field.alias)
                    )
                )
                # 必须则引发错误
            else:
                values[field.name] = deepcopy(field.default)
                # 否则使用默认值
            continue
        v_, errors_ = field.validate(
            value, values, loc=(field_info.in_.value, field.alias)
        )
        if isinstance(errors_, ErrorWrapper):
            errors.append(errors_)
        elif isinstance(errors_, list):
            errors.extend(errors_)
        else:
            values[field.name] = v_
    return values, errors
回到solve_dependencies
    if dependant.body_params:
        # 如果有user_info(user: User)这样的参数,User为Model。
        # 其会保存到body_params中,现在是处理它们的时候
        (
            body_values,
            body_errors,
        ) = await request_body_to_args(  # body_params checked above
            required_params=dependant.body_params, received_body=body
        )
        # body传入进去,进行匹配,得到结果
        values.update(body_values)
        errors.extend(body_errors)
        # 整合结果
    if dependant.http_connection_param_name:
        values[dependant.http_connection_param_name] = request
    if dependant.request_param_name and isinstance(request, Request):
        values[dependant.request_param_name] = request
    elif dependant.websocket_param_name and isinstance(request, WebSocket):
        values[dependant.websocket_param_name] = request
    # 这三者是解决需要特定参数的时候,主要是指Request或WebSocket这样的参数

    if dependant.background_tasks_param_name:
        if background_tasks is None:
            background_tasks = BackgroundTasks()
        values[dependant.background_tasks_param_name] = background_tasks
    # 后台任务
    if dependant.response_param_name:
        values[dependant.response_param_name] = response
    # 如果需要操作Response报文,这里会提供sub response,其内容最后会整合到response中

    if dependant.security_scopes_param_name:
        values[dependant.security_scopes_param_name] = SecurityScopes(
            scopes=dependant.security_scopes
        )
    # 拿到安全域
    return values, errors, background_tasks, response, dependency_cache

solve_dependencies本身也是递归函数,这和get_dependant是相辅相成的。

上一篇下一篇

猜你喜欢

热点阅读