查询处理相关实现
- 参考:PostgreSQL数据库内核分析
Parser(sql解析器)
解析器对客户端执行的sql语句进行词法解析,语法解析,语义分析,最后生成查询树。
- 词法语法解析
基于flex,bison工具,如果语法正确,则生成抽象语法树,否则返回出错信息到客户端。
词法语法解析过程见文章:https://www.jianshu.com/p/c839440975a0 - 语义分析
语义分析入口函数为analyze.cpp中的parse_analyze。parse_analyze分析抽象语法树转换为查询树,函数内调用transformTopLevelStmt执行下一步骤,transformTopLevelStmt主要处理SELECT ... INTO语法,将其转换为CREATE TABLE AS的语法,后续调用transformStmt函数。
transformStmt函数判断NodeTag,进入相关的分支,调用相关函数。
-> InsertStmt -- insert语句相关
-> DeleteStmt -- delete语句相关
-> UpdateStmt -- update语句相关
-> MergeStmt -- merge语句相关
-> SelectStmt -- select语句相关
-> DeclareCursorStmt -- Declare cursor语句相关
-> ExplainStmt -- Explain语句相关
-> ExecDirectStmt -- EXECUTE DIRECT语句相关
-> CreateTableAsStmt -- CREATE TABLE AS, SELECT ... INTO, or CREATE MATERIALIZED VIEW语句相关
-> 其他 -- 其他类型直接返回
这里使用SELECT语句进行说明
case T_SelectStmt: {
SelectStmt* n = (SelectStmt*)parseTree;
if (n->valuesLists) // VALUES
result = transformValuesClause(pstate, n);
else if (n->op == SETOP_NONE) // SELECT,进入该分支
result = transformSelectStmt(pstate, n, isFirstNode, isCreateView);
else
result = transformSetOperationStmt(pstate, n); // 集合,SELECT 带UNION/INTERSECT/EXCEPT
} break;
(1) NodeTag=T_SelectStmt,进入T_SelectStmt分支。
(2) n->valuesLists为空,n->op=SETOP_NONE非集合运算,调用transformSelectStmt函数
a. 判断是否有with子句,如果有就调用transformWithClause函数进行转换,当前的例子中不含with子句
b. 转换from子句,调用transformFromClause函数
c. 转换目标列表,调用transformTargetList函数
d. 判断是否有(+)语法,有则调用transformOperatorPlus函数
e. markTargetListOrigins函数,该例中去获取目标列表的关联表oid及其属性编号,赋值到TargetEntry
f. 转换where子句,调用transformWhereClause函数,函数内调用transformExpr转换各类表达式
g. 转换having子句,调用transformWhereClause
h. 转换order by子句,调用transformSortClause函数
i. 转换group by子句,调用transformGroupClause函数
j. 判断是否有distinct子句,有则调用transformDistinctClause处理,否则判断是否有distinct on子句,有则调用transformDistinctOnClause函数处理
k. 转换limit子句,调用transformLimitClause函数
l. 转换window子句,调用transformWindowDefinitions函数处理
m. 转换locking子句,调用transformLockingClause函数
n. 将生成的查询树返回
相关数据结构
typedef struct Query {
NodeTag type;
CmdType commandType; /* 命令类型select|insert|update|delete|merge|utility */
QuerySource querySource; /* where did I come from? */
uint64 queryId; /* query identifier (can be set by plugins) */
bool canSetTag; /* do I set the command result tag? */
Node* utilityStmt; /* non-null if this is DECLARE CURSOR or a
* non-optimizable statement */
int resultRelation; /* rtable index of target relation for
* INSERT/UPDATE/DELETE/MERGE; 0 for SELECT */
bool hasAggs; /* 目标列表,having子名是否存在聚合函数 */
bool hasWindowFuncs; /* 目标列表是否存在窗口函数 */
bool hasSubLinks; /* 是否存在子链接 */
bool hasDistinctOn; /* DISTINCT ON子句 */
bool hasRecursive; /* 是否存在WITH RECURSIVE */
bool hasModifyingCTE; /* with子句存在INSERT/UPDATE/DELETE */
bool hasForUpdate; /* FOR UPDATE or FOR SHARE was specified */
bool hasRowSecurity; /* rewriter has applied some RLS policy */
bool hasSynonyms; /* has synonym mapping in rtable */
List* cteList; /* WITH list (of CommonTableExpr's) */
List* rtable; /* RTE */
FromExpr* jointree; /* FROM,WHERE子句) */
List* targetList; /* 目标列表 (of TargetEntry) */
List* starStart; /* Corresponding p_star_start in ParseState */
List* starEnd; /* Corresponding p_star_end in ParseState */
List* starOnly; /* Corresponding p_star_only in ParseState */
List* returningList; /* returning子句 (of TargetEntry) */
List* groupClause; /* group by子句 */
List* groupingSets; /* a list of GroupingSet's if present */
Node* havingQual; /* having子句 */
List* windowClause; /* WINDOW子句 */
List* distinctClause; /* distinct子句 */
List* sortClause; /* order by 子句 */
// limit子句
Node* limitOffset; /* # of result tuples to skip (int8 expr) */
Node* limitCount; /* # of result tuples to return (int8 expr) */
List* rowMarks; /* a list of RowMarkClause's */
// 集合
Node* setOperations; /* set-operation tree if this is top level of
* a UNION/INTERSECT/EXCEPT query */
List *constraintDeps; /* a list of pg_constraint OIDs that the query
* depends on to be semantically valid */
HintState* hintState;
#ifdef PGXC
/* need this info for PGXC Planner, may be temporary */
char* sql_statement; /* original query */
bool is_local; /* enforce query execution on local node
* this is used by EXECUTE DIRECT especially. */
bool has_to_save_cmd_id; /* true if the query is such an INSERT SELECT
* that inserts into a child by selecting
* from its parent OR a WITH query that
* updates a table in main query and inserts
* a row to the same table in WITH query */
bool vec_output; /* true if it's vec output. this flag is used in FQS planning */
TdTruncCastStatus tdTruncCastStatus; /* Auto truncation Cast added, only used for stmt in stored procedure or
prepare stmt. */
List* equalVars; /* vars appears in UPDATE/DELETE clause */
#endif
ParamListInfo boundParamsQ;
int mergeTarget_relation;
List* mergeSourceTargetList;
List* mergeActionList; /* list of actions for MERGE (only) */
Query* upsertQuery; /* insert query for INSERT ON DUPLICATE KEY UPDATE (only) */
UpsertExpr* upsertClause; /* DUPLICATE KEY UPDATE [NOTHING | ...] */
bool isRowTriggerShippable; /* true if all row triggers are shippable. */
bool use_star_targets; /* true if use * for targetlist. */
bool is_from_full_join_rewrite; /* true if the query is created when doing
* full join rewrite. If true, we should not
* do some expression processing.
* Please refer to subquery_planner.
*/
uint64 uniqueSQLId; /* used by unique sql id */
bool can_push;
bool unique_check; /* true if the subquery is generated by general
* sublink pullup, and scalar output is needed */
Oid* fixed_paramTypes; /* For plpy CTAS query. CTAS is a recursive call.CREATE query is the first rewrited.
* thd 2nd rewrited query is INSERT SELECT.whithout this attribute, DB will have
* an error that has no idea about $x when INSERT SELECT query is analyzed. */
int fixed_numParams;
} Query;
rewrite 查询重写
根据规则系统对生成的查询树进行重写,入口函数QueryRewrite。
(1) 调用RewriteQuery函数,应用所有非select规则生成0或多个query,处理insert/update/delete语句
(2) 遍历上一步生成的查询树链表,调用fireRIRrules函数,对RTE(RangeTblEntry)应用所有RIR规则。
如果RTE是subquery子查询,递归fireRIRrules。
如果是连接或未被引用的RTE,或物化视图,或表未关联查询 或是函数ApplyRetrieveRule的结果,忽略。
如果RTE是表并在查询树中引用,则调用ApplyRetrieveRule函数应用相关规则。
查询树中的with子句,递归调用fireRIRrules处理。
如果查询树有子链接,调用query_tree_walker函数遍历子链接,并用fireRIRonSubLink进行重写。
(3) 返回重写后的查询树
相关数据结构
typedef struct RewriteRule {
Oid ruleId;
CmdType event;
AttrNumber attrno;
Node* qual;
List* actions;
char enabled;
bool isInstead;
} RewriteRule;
Planner
得到可被执行器执行的最优计划,分为预处理,生成路径,生成计划三个阶段。
入口函数为pg_plan_queries,遍历查询分析得到的查询树,调用pg_plan_query来生成计划。
- 预处理
主要提升子链接和子查询。
主要在subquery_planner中实现,调用pull_up_sublinks,pull_up_subqueries提升子链接、子查询,根据查询树中的相关信息确定是否存在连接等。preprocess_expression对targetlist,quals等进行处理。preprocess_qual_conditions预处理约束条件。提升having子句到where条件。调用reduce_outer_joins简化outer joins为inner joins。 - 生成路径
采用动态规划算法或遗传算法,生成最优连接路径和候选的路径链表。 - 生成计划
用得到的最优路径,生成基本计划树,再添加GROUP BY, HAVING, ORDER BY等子句所对应的计划节点形成完整计划树。
inheritance_planner,grouping_planner生成计划树,如更新操作中对象存在继承关系,则调用inheritance_planner,对每一个relation都生成一个计划,最终会调用grouping_planner。
grouping_planner,如果setOperations不为空,则遍历setOperations,每个子查询都生成计划。setOperations为空,则:
如果有group by子句,则调整其属性顺序以匹配order by子句中的顺序
调用preprocess_targetlist处理目标列表
计算有分组、排序需求的pathkeys
query_planner创建路径,cheapest_path, sortd_path
选择best_path,如果为hash分组或sorted_path为空时,采用cheapest_path为最优路径
生成可优化的min/max聚集计划,如果没有,则调用create_plan生成普通计划
在计划的基础上,如果有group by,聚集,order by,distinct,limit,对计划进行包装
相关数据结构
typedef struct Plan {
NodeTag type;
int plan_node_id; /* node id */
int parent_node_id; /* parent node id */
RemoteQueryExecType exec_type;
/*
* estimated execution costs for plan (see costsize.c for more info)
*/
Cost startup_cost; /* 启动代价 */
Cost total_cost; /* 总代价 */
/*
* planner's estimate of result size of this plan step
*/
double plan_rows; /* number of global rows plan is expected to emit */
double multiple;
int plan_width; /* average row width in bytes */
int dop; /* degree of parallelism of current plan */
/*
* machine learning model estimations
*/
double pred_rows;
double pred_startup_time;
double pred_total_time;
long pred_max_memory;
/*
* MPPDB Recursive-Union Support
*
* - @recursive_union_plan_nodeid
* Pointing to its belonging RecursiveUnion's plan node id to indate if we are
* under RecursiveUnion
*
* - @recursive_union_controller
* Indicate if current Plan node is controller node in recursive-union steps
*
* - @control_plan_nodeid
* Normally, set on the top-plan node of a producer thread, to indicate which
* control-plan we need syn-up with
*
* - @is_sync_planode
* Indicate the current producer thread is the sync-up thread in recursive union,
* normally set on produer's top plan node
*
* Please note the above four variables is meaningless if a plan node is not under
* recursive-union's recursive part
*/
/*
* plan node id of RecursiveUnion node where current plan node belongs to, 0 for
* not under recursive-union
*/
int recursive_union_plan_nodeid;
/* flag to indicate if it is controller plan node */
bool recursive_union_controller;
/* plan node id of Controller plan node, 0 for not in control */
int control_plan_nodeid;
/* flag indicate if the current plan node is the sync node (for multi-stream case) */
bool is_sync_plannode;
/*
* 通用结构数据.
*/
List* targetlist; /* target list to be computed at this node */
List* qual; /* implicitly-ANDed qual conditions */
struct Plan* lefttree; /* input plan tree(s) */
struct Plan* righttree;
bool ispwj; /* is it special for partitionwisejoin? */
int paramno; /* the partition'sn that it is scaning */
List* initPlan; /* Init Plan nodes (un-correlated expr
* subselects) */
List* distributed_keys; /* 分布键 */
ExecNodes* exec_nodes; /* List of Datanodes where to execute this plan */
/*
* Information for management of parameter-change-driven rescanning
*
* extParam includes the paramIDs of all external PARAM_EXEC params
* affecting this plan node or its children. setParam params from the
* node's initPlans are not included, but their extParams are.
*
* allParam includes all the extParam paramIDs, plus the IDs of local
* params that affect the node (i.e., the setParams of its initplans).
* These are _all_ the PARAM_EXEC params that affect this node.
*/
Bitmapset* extParam;
Bitmapset* allParam;
// For vectorized engine, plan produce vector output
//
bool vec_output;
/*
* @hdfs
* Mark the foreign scan whether has unique results on one of its
* output columns.
*/
bool hasUniqueResults;
/*
* Mark the plan whether includes delta table or not.
*/
bool isDeltaTable;
/* used to replace work_mem, maxmem in [0], and minmem in [1] */
int operatorMemKB[2];
/* allowed max mem after spread */
int operatorMaxMem;
bool parallel_enabled; /* Is it run in parallel? */
bool hasHashFilter; /* true for this plan has a hashfilter */
List* var_list; /* Need bloom filter var list. */
List* filterIndexList; /* Need used bloomfilter array index. */
/* used to replace work_mem */
int** ng_operatorMemKBArray; /* for multiple logic cluster */
int ng_num;
double innerdistinct; /* join inner rel distinct estimation value */
double outerdistinct; /* join outer rel distinct estimation value */
} Plan;
Executor
查询优化策略
(1)PORTAL_ONE_SELECT:处理单个的SELECT语句,调用Executor模块
(2)PORTAL_ONE_RETURNING:处理带RETURNING的UPDATE/DELETE/INSERT语句,调用Executor模块
(3)PORTAL_UTIL_SELECT:处理单个的数据定义语句,调用ProcessUtility模块
(4)PORTAL_ONE_MOD_WITH:处理带有INSERT/UPDATE/DELETE的WITH子句的SELECT,其处理逻辑类似PORTAL_ONE_RETURNING。调用Executor模块
(5)PORTAL_MULTI_QUERY:是前面几种策略的混合,可以处理多个原子操作
PortalStart调用ChoosePortalStrategy选择使用哪种策略
功能处理器(ProcessUtility)根据节点类型调用相应的处理过程。
可优化语句的执行,根据计划树,调用ExecutorStart,ExecutorRun,ExecutorEnd完成。
ExecutorStart主要调用standard_ExecutorStart函数,CreateExecutorState初始化EState,ExecInitNode递归初始化计划树的所有节点。
ExecutePlan遍历计划节点调用ExecProcNode执行
ExecutorEnd在执行完成后做一些清理工作
相关数据结构
typedef struct EState {
NodeTag type;
/* Basic state for all query types: */
ScanDirection es_direction; /* current scan direction */
Snapshot es_snapshot; /* time qual to use */
Snapshot es_crosscheck_snapshot; /* crosscheck time qual for RI */
List* es_range_table; /* List of RangeTblEntry */
PlannedStmt* es_plannedstmt; /* link to top of plan tree */
JunkFilter* es_junkFilter; /* top-level junk filter, if any */
/* If query can insert/delete tuples, the command ID to mark them with */
CommandId es_output_cid;
/* Info about target table(s) for insert/update/delete queries: */
ResultRelInfo* es_result_relations; /* array of ResultRelInfos */
int es_num_result_relations; /* length of array */
ResultRelInfo* es_result_relation_info; /* currently active array elt */
Relation esCurrentPartition;
HTAB* esfRelations; /* do the update,delete , cache the Relation which get from partitionGetRelation */
#ifdef PGXC
struct PlanState* es_result_remoterel; /* currently active remote rel */
struct PlanState* es_result_insert_remoterel; /* currently active remote rel */
struct PlanState* es_result_update_remoterel; /* currently active remote rel */
struct PlanState* es_result_delete_remoterel; /* currently active remote rel */
#endif
/* Stuff used for firing triggers: */
List* es_trig_target_relations; /* trigger-only ResultRelInfos */
TupleTableSlot* es_trig_tuple_slot; /* for trigger output tuples */
TupleTableSlot* es_trig_oldtup_slot; /* for TriggerEnabled */
TupleTableSlot* es_trig_newtup_slot; /* for TriggerEnabled */
/* Parameter info: */
ParamListInfo es_param_list_info; /* values of external params */
ParamExecData* es_param_exec_vals; /* values of internal params */
/* Other working state: */
MemoryContext es_query_cxt; /* per-query context in which EState lives */
MemoryContext es_const_query_cxt; /* const per-query context used to create node context */
List* es_tupleTable; /* List of TupleTableSlots */
List* es_rowMarks; /* List of ExecRowMarks */
uint64 es_processed; /* # of tuples processed */
uint64 deleteLimitCount; /* delete Limit */
uint64 es_last_processed; /* last value of es_processed for ModifyTable plan*/
Oid es_lastoid; /* last oid processed (by INSERT) */
int es_top_eflags; /* eflags passed to ExecutorStart */
int es_instrument; /* OR of InstrumentOption flags */
bool es_finished; /* true when ExecutorFinish is done */
List* es_exprcontexts; /* List of ExprContexts within EState */
List* es_subplanstates; /* List of PlanState for SubPlans */
List* es_auxmodifytables; /* List of secondary ModifyTableStates */
List* es_remotequerystates; /* List of RemoteQueryStates */
/*
* this ExprContext is for per-output-tuple operations, such as constraint
* checks and index-value computations. It will be reset for each output
* tuple. Note that it will be created only if needed.
*/
ExprContext* es_per_tuple_exprcontext;
/*
* These fields are for re-evaluating plan quals when an updated tuple is
* substituted in READ COMMITTED mode. es_epqTuple[] contains tuples that
* scan plan nodes should return instead of whatever they'd normally
* return, or NULL if nothing to return; es_epqTupleSet[] is true if a
* particular array entry is valid; and es_epqScanDone[] is state to
* remember if the tuple has been returned already. Arrays are of size
* list_length(es_range_table) and are indexed by scan node scanrelid - 1.
*/
HeapTuple* es_epqTuple; /* array of EPQ substitute tuples */
bool* es_epqTupleSet; /* true if EPQ tuple is provided */
bool* es_epqScanDone; /* true if EPQ tuple has been fetched */
List* es_subplan_ids;
bool es_skip_early_free; /* true if we don't apply early free mechanisim, especially for subplan */
/* true if we don't apply early-free-consumer mechanisim, especially for subplan */
bool es_skip_early_deinit_consumer;
bool es_under_subplan; /* true if operator is under a subplan */
List* es_material_of_subplan; /* List of Materialize operator of subplan */
bool es_recursive_next_iteration; /* true if under recursive-stream and need to rescan. */
/* data redistribution for DFS table.
* dataDestRelIndex is index into the range table. This variable
* will take effect on data redistribution state.
*/
Index dataDestRelIndex;
BloomFilterControl es_bloom_filter; /* bloom filter controller */
bool es_can_realtime_statistics; /* true if can realime statistics */
bool es_can_history_statistics; /* true if can history statistics */
bool isRowTriggerShippable; /* true if all row triggers are shippable. */
#ifdef ENABLE_MOT
JitExec::JitContext* mot_jit_context; /* MOT JIT context required for executing LLVM jitted code */
#endif
PruningResult* pruningResult;
} EState;