Calcite SQL Optimizer介绍
SQL优化器详解
无论calcite还是antrl,在sql到物理执行计划的过程中,过程基本一致,这里只以calcite进行介绍。
SQL计算引擎是如何实现的?
calcite sql到引擎物理执行计划流程
SQL计算引擎实现流程
- sqlParser进行词法解析,形成AST(抽象语法树)
- 词法分析切割字符,根据语义规则进行语法分析,形成语法树
- 语法树校验,校验包括(数据类型,数据字段,catalog等)
- 形成逻辑树
- 优化逻辑树
- 从逻辑树的根节点递归转化,转化成物理执行计划。
基于Calcite的FLinkSQL实现
image.png基于antrl的SparkSql实现
SparkSql实现流程
1.sqlText 经过 SqlParser 解析成 Unresolved LogicalPlan;
2. analyzer 模块结合catalog进行绑定,生成 resolved LogicalPlan;
3. optimizer 模块对 resolved LogicalPlan 进行优化,生成 optimized LogicalPlan;
4. SparkPlan 将 LogicalPlan 转换成PhysicalPlan;
5. prepareForExecution()将 PhysicalPlan 转换成可执行物理计划;
使用 execute()执行可执行物理计划;
我们可以看出,无论采用哪种解析器,SQL计算引擎的实现流程基本是一致的。
由于sql到逻辑树的过程,相对比较简单,这里不再展开介绍,主要介绍SQL优化器,由于SQL优化器实现和方法大致相同,这里只介绍其中一种。
calcite Optimizer
SQL优化器根据常见的算法模型,又分为2种优化方式
- RBO,基于规则的优化器(算法:遍历)
- CBO,基于代价的优化器(算法: 动态规划)
无论是RBO还是CBO都是根究优化规则进行的,优化规则是对关系表达式的等价转化,常见的优化规则有
- 谓词下推 Predicate Pushdown
- 常量折叠 Constant Folding
- 列裁剪 Column Pruning
- 其他
RBO优化方式
image.png- 首先遍历规则组
规则组定义树种节点的遍历顺序,默认按【深度优先】顺序,还有:
ARBITRARY:按任意顺序匹配(因为它是有效的,而且大部分的 Rule 并不关心匹配顺序);
BOTTOM_UP:自下而上,先从子节点开始匹配;
TOP_DOWN:自上而下,先从父节点开始匹配;
DEPTH_FIRST:深度优先匹配,某些情况下比 ARBITRARY 高效(为了避免新的 vertex 产生后又从 root 节点开始匹配)。
匹配算法:
从一个节点开始,跟 rules 的所有 Rule 进行匹配,匹配上就进行转换操作,这个节点操作完,再进行下一个节点,这里的匹配顺序就是指的节点遍历顺序.
```
依次使用规则组的规则进行树的优化,树中的节点能匹配上规则,就进行优化,接着进行下一个树节点的匹配,直到树中所有节点遍历完。
举个例子,
```-- sql
create table user (
id BIGINT,
name STRING,
liveRoomId BIGINT,
age BIGINT
);
create table anchor (
id BIGINT,
name STRING,
sex String
);
select 100 + 200 as tag, user.name from user join anchor on user.liveRoomId = anchor.id where user.age > 30 and anchor = 'female';
基于RBO优化方式下,对上面的sql进行谓词下推,常量折叠,列裁剪优化。假设这三个规则在一个规则组内。
image.png image.pngRBO就是按照规则进行匹配,直到达到次数限制(match 次数限制)或者遍历一遍后不再出现 rule match 的情况才算完成。
CBO优化方式
根据优化规则对关系表达式进行转换,这里的转换是说一个关系表达式经过优化规则后会生成另外一个关系表达式,同时原有表达式也会保留,经过一系列转换后会生成多个执行计划,然后 CBO 会根据统计信息和代价模型 (Cost Model) 计算每个执行计划的 Cost,从中挑选 Cost 最小的执行计划。
主要采用动态规划以及贪心算法的思想来实现。
image.png1.对每个子树进行规则优化,保留执行执行计划,同时计算执行计划的代价。
2.父节点的最小代价等于子树的最小代价组合(动态规划和贪心算法)
区别
CBO 是优于 RBO 的,原因是 RBO 是一种只认规则,对数据不敏感的呆板的优化器,而在实际过程中,数据往往是有变化的,通过 RBO 生成的执行计划很有可能不是最优的。事实上目前各大数据库和大数据计算引擎都倾向于使用 CBO,但是对于流式计算引擎来说,使用 CBO 还是有很大难度的,因为并不能提前预知数据量等信息,这会极大地影响优化效果,CBO 主要还是应用在离线的场景。
FlinkSQL的规则优化
FLink里面存在很多优化规则组,大部分规则组采用的是CBO优化方式,只有在"logical"、"physical"优化过程中使用了VolcanoPlanner优化器。也就是说对于logical tree 先采用CBO的基于规则组进行优化一遍,然后再接着采用CBO的方式优化一遍。
CBO中calcite代码实现分析。
实现一个规则优化必须要解决的问题
- 定义关系树和规则是否匹配
- 匹配的时候,对关系树进行怎么样的优化转换。
来看下在calcite中,RBO是如何实现。
Hepplanner(RBO实现)
private void executeProgram(HepProgram program) {
HepProgram savedProgram = this.currentProgram;
this.currentProgram = program;
this.currentProgram.initialize(program == this.mainProgram);
//获取一个规则组面的所有规则
UnmodifiableIterator var3 = this.currentProgram.instructions.iterator();
//迭代进行规则优化
while(var3.hasNext()) {
//具体规则封装
HepInstruction instruction = (HepInstruction)var3.next();
//执行优化
instruction.execute(this);
int delta = this.nTransformations - this.nTransformationsLastGC;
if (delta > this.graphSizeLastGC) {
this.collectGarbage();
}
}
this.currentProgram = savedProgram;
}
void executeInstruction(RuleInstance instruction) {
if (!this.skippingGroup()) {
if (instruction.rule == null) {
assert instruction.ruleDescription != null;
instruction.rule = this.getRuleByDescription(instruction.ruleDescription);
LOGGER.trace("Looking up rule with description {}, found {}", instruction.ruleDescription, instruction.ru
}
if (instruction.rule != null) {
//规则非null,运用规则 this.applyRules(Collections.singleton(instruction.rule), true);
}
}
}
private void applyRules(Collection<RelOptRule> rules, boolean forceConversions) {
if (this.currentProgram.group != null) {
assert this.currentProgram.group.collecting;
this.currentProgram.group.ruleSet.addAll(rules);
} else {
LOGGER.trace("Applying rule set {}", rules);
boolean fullRestartAfterTransformation = this.currentProgram.matchOrder != HepMatchOrder.ARBITRARY && this.currentProgram.matchOrder != HepMatc
int nMatches = 0;
boolean fixedPoint;
label58:
do {
//获取logical tree所有节点的遍历顺序集合
Iterator<HepRelVertex> iter = this.getGraphIterator(this.root);
fixedPoint = true;
while(true) {
while(true) {
if (!iter.hasNext()) {
continue label58;
}
//遍历节点
HepRelVertex vertex = (HepRelVertex)iter.next();
//获取当前所有规则
Iterator var8 = rules.iterator();
//遍历规则
while(var8.hasNext()) {
RelOptRule rule = (RelOptRule)var8.next();
//每个节点执行规则优化,返回优化后的节点
HepRelVertex newVertex = this.applyRule(rule, vertex, forceConversions);
if (newVertex != null && newVertex != vertex) {
++nMatches;
if (nMatches >= this.currentProgram.matchLimit) {
return;
}
if (fullRestartAfterTransformation) {
iter = this.getGraphIterator(this.root);
} else {
iter = this.getGraphIterator(newVertex);
if (this.currentProgram.matchOrder == HepMatchOrder.DEPTH_FIRST) {
nMatches = this.depthFirstApply(iter, rules, forceConversions, nMatches);
if (nMatches >= this.currentProgram.matchLimit) {
return;
}
}
fixedPoint = false;
}
break;
}
}
}
}
} while(!fixedPoint);
}
}
private Iterator<HepRelVertex> getGraphIterator(HepRelVertex start) {
this.collectGarbage();
switch(this.currentProgram.matchOrder) {
case ARBITRARY:
case DEPTH_FIRST:
return DepthFirstIterator.of(this.graph, start).iterator();
case TOP_DOWN:
assert start == this.root;
return TopologicalOrderIterator.of(this.graph).iterator();
case BOTTOM_UP:
default:
assert start == this.root;
List<HepRelVertex> list = new ArrayList();
Iterator var3 = TopologicalOrderIterator.of(this.graph).iterator();
while(var3.hasNext()) {
HepRelVertex vertex = (HepRelVertex)var3.next();
list.add(vertex);
}
Collections.reverse(list);
return list.iterator();
}
}
private boolean belongsToDag(HepRelVertex vertex) {
Pair<String, RelDataType> key = key(vertex.getCurrentRel());
return this.mapDigestToVertex.get(key) != null;
}
private HepRelVertex applyRule(RelOptRule rule, HepRelVertex vertex, boolean forceConversions) {
if (!this.belongsToDag(vertex)) {
return null;
} else {
RelTrait parentTrait = null;
List<RelNode> parents = null;
if (rule instanceof ConverterRule) {
ConverterRule converterRule = (ConverterRule)rule;
if (converterRule.isGuaranteed() || !forceConversions) {
if (!this.doesConverterApply(converterRule, vertex)) {
return null;
}
parentTrait = converterRule.getOutTrait();
}
} else if (rule instanceof CommonRelSubExprRule) {
List<HepRelVertex> parentVertices = this.getVertexParents(vertex);
if (parentVertices.size() < 2) {
return null;
}
parents = new ArrayList();
Iterator var7 = parentVertices.iterator();
while(var7.hasNext()) {
HepRelVertex pVertex = (HepRelVertex)var7.next();
parents.add(pVertex.getCurrentRel());
}
}
List<RelNode> bindings = new ArrayList();
Map<RelNode, List<RelNode>> nodeChildren = new HashMap();
boolean match = this.matchOperands(rule.getOperand(), vertex.getCurrentRel(), bindings, nodeChildren);
if (!match) {
return null;
} else {
HepRuleCall call = new HepRuleCall(this, rule.getOperand(), (RelNode[])bindings.toArray(new RelNode[0]), nodeChildren, parents);
//当前节点是否能被当前规则优化
if (!rule.matches(call)) {
return null;
} else {
//能优化,进行优化
this.fireRule(call);
return !call.getResults().isEmpty() ? this.applyTransformationResults(vertex, call, parentTrait) : null;
}
}
}
protected void fireRule(RelOptRuleCall ruleCall) {
this.checkCancel();
assert ruleCall.getRule().matches(ruleCall);
if (this.isRuleExcluded(ruleCall.getRule())) {
LOGGER.debug("call#{}: Rule [{}] not fired due to exclusion filter", ruleCall.id, ruleCall.getRule());
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("call#{}: Apply rule [{}] to {}", new Object[]{ruleCall.id, ruleCall.getRule(), Arrays.toString(ruleCall.rels)});
}
RuleAttemptedEvent event;
if (this.listener != null) {
event = new RuleAttemptedEvent(this, ruleCall.rel(0), ruleCall, true);
this.listener.ruleAttempted(event);
}
//真正执行规则优化。
ruleCall.getRule().onMatch(ruleCall);
if (this.listener != null) {
event = new RuleAttemptedEvent(this, ruleCall.rel(0), ruleCall, false);
this.listener.ruleAttempted(event);
}
}
}
这里举个Flink PushFilterToTableSouceScanRule实现
Flink PushFilterToTableSouceScanRule优化实现
//继承RelOptRule
class PushFilterIntoTableSourceScanRule extends RelOptRule(
operand(classOf[Filter],
operand(classOf[LogicalTableScan], none)),
"PushFilterIntoTableSourceScanRule") {
//重写matches,节点是否能被规则优化
override def matches(call: RelOptRuleCall): Boolean = {
val config = call.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
if (!config.getConfiguration.getBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
return false
}
val filter: Filter = call.rel(0)
if (filter.getCondition == null) {
return false
}
val scan: LogicalTableScan = call.rel(1)
scan.getTable.unwrap(classOf[TableSourceTable[_]]) match {
case table: TableSourceTable[_] =>
table.tableSource match {
//当前节点是TableSourceTable,且是FilterableTableSource并且支持条件下推,则返回true。
case source: FilterableTableSource[_] => !source.isFilterPushedDown
case _ => false
}
case _ => false
}
}
//匹配成功,进行规则优化。
override def onMatch(call: RelOptRuleCall): Unit = {
val filter: Filter = call.rel(0)
val scan: LogicalTableScan = call.rel(1)
val table: TableSourceTable[_] = scan.getTable.asInstanceOf[TableSourceTable[_]]
pushFilterIntoScan(call, filter, scan, table)
}
private def pushFilterIntoScan(
call: RelOptRuleCall,
filter: Filter,
scan: LogicalTableScan,
relOptTable: FlinkPreparingTableBase): Unit = {
val relBuilder = call.builder()
val context = call.getPlanner.getContext.unwrap(classOf[FlinkContext])
val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan)
val (predicates, unconvertedRexNodes) =
RexNodeExtractor.extractConjunctiveConditions(
filter.getCondition,
maxCnfNodeCount,
filter.getInput.getRowType.getFieldNames,
relBuilder.getRexBuilder,
context.getFunctionCatalog,
context.getCatalogManager,
TimeZone.getTimeZone(scan.getCluster.getPlanner.getContext
.unwrap(classOf[FlinkContext]).getTableConfig.getLocalTimeZone))
if (predicates.isEmpty) {
// no condition can be translated to expression
return
}
val remainingPredicates = new util.LinkedList[Expression]()
predicates.foreach(e => remainingPredicates.add(e))
val newRelOptTable: FlinkPreparingTableBase =
applyPredicate(remainingPredicates, relOptTable, relBuilder.getTypeFactory)
val newTableSource = newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
val oldTableSource = relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
if (newTableSource.asInstanceOf[FilterableTableSource[_]].isFilterPushedDown
&& newTableSource.explainSource().equals(oldTableSource.explainSource)) {
throw new TableException("Failed to push filter into table source! "
+ "table source with pushdown capability must override and change "
+ "explainSource() API to explain the pushdown applied!")
}
val newScan = new LogicalTableScan(scan.getCluster, scan.getTraitSet, newRelOptTable)
// check whether framework still need to do a filter
if (remainingPredicates.isEmpty && unconvertedRexNodes.isEmpty) {
call.transformTo(newScan)
} else {
relBuilder.push(scan)
val converter = new ExpressionConverter(relBuilder)
val remainingConditions = remainingPredicates.map(_.accept(converter)) ++ unconvertedRexNodes
val remainingCondition = remainingConditions.reduce((l, r) => relBuilder.and(l, r))
//剩余的条件,构建新的filter
val newFilter = filter.copy(filter.getTraitSet, newScan, remainingCondition)
//新的LogicalFilter参与后续规则优化
call.transformTo(newFilter)
}
}
//table source 需要处理那些条件
private def applyPredicate(
predicates: util.List[Expression],
relOptTable: FlinkPreparingTableBase,
typeFactory: RelDataTypeFactory): FlinkPreparingTableBase = {
val originPredicatesSize = predicates.size()
val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
val filterableSource = tableSourceTable.tableSource.asInstanceOf[FilterableTableSource[_]]
//source处理条件。
val newTableSource = filterableSource.applyPredicate(predicates)
val updatedPredicatesSize = predicates.size()
val statistic = tableSourceTable.getStatistic
val newStatistic = if (originPredicatesSize == updatedPredicatesSize) {
// Keep all Statistics if no predicates can be pushed down
statistic
} else if (statistic == FlinkStatistic.UNKNOWN) {
statistic
} else {
// Remove tableStats after predicates pushed down
FlinkStatistic.builder().statistic(statistic).tableStats(null).build()
}
tableSourceTable.copy(newTableSource, newStatistic)
}
}
object PushFilterIntoTableSourceScanRule {
val INSTANCE: RelOptRule = new PushFilterIntoTableSourceScanRule
}
主要步骤:
1.如果当前节点是FilterableSource类型且支持条件下推,那么能匹配当前规则
2.使用当前规则对当前节点进行优化
3.FilterableSource处理推上来的条件
4.剩余的条件,构建新的LogicaFilter,继续进行其他规则优化。