【spark系列11】spark 的动态分区裁剪下(Dynami
2021-01-12 本文已影响0人
鸿乃江边鸟
背景
本文基于delta 0.7.0
spark 3.0.1
spark 3.x引入了动态分区裁剪,在 spark 的动态分区裁剪上(Dynamic partition pruning)-逻辑计划我们提到在逻辑计划阶段会加入DynamicPruningSubquery,今天我们分析一下在物理阶段怎么对DynamicPruningSubquery进行优化以及实现的
分析
直接转到PlanDynamicPruningFilters的apply方法:
override def apply(plan: SparkPlan): SparkPlan = {
if (!SQLConf.get.dynamicPartitionPruningEnabled) {
return plan
}
plan transformAllExpressions {
case DynamicPruningSubquery(
value, buildPlan, buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId) =>
val sparkPlan = QueryExecution.createSparkPlan(
sparkSession, sparkSession.sessionState.planner, buildPlan)
// Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is
// the first to be applied (apart from `InsertAdaptiveSparkPlan`).
val canReuseExchange = SQLConf.get.exchangeReuseEnabled && buildKeys.nonEmpty &&
plan.find {
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _) =>
left.sameResult(sparkPlan)
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right) =>
right.sameResult(sparkPlan)
case _ => false
}.isDefined
if (canReuseExchange) {
val mode = broadcastMode(buildKeys, buildPlan)
val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, executedPlan)
val name = s"dynamicpruning#${exprId.id}"
// place the broadcast adaptor for reusing the broadcast results on the probe side
val broadcastValues =
SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange)
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
} else if (onlyInBroadcast) {
// it is not worthwhile to execute the query, so we fall-back to a true literal
DynamicPruningExpression(Literal.TrueLiteral)
} else {
// we need to apply an aggregate on the buildPlan in order to be column pruned
val alias = Alias(buildKeys(broadcastKeyIndex), buildKeys(broadcastKeyIndex).toString)()
val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan)
DynamicPruningExpression(expressions.InSubquery(
Seq(value), ListQuery(aggregate, childOutputs = aggregate.output)))
}
}
}
- 如果没有开启动态分区裁剪,则直接跳过
-
QueryExecution.createSparkPlan( sparkSession, sparkSession.sessionState.planner, buildPlan)
通过逻辑计划构造物理计划 - 判断是否reuseExchange,如果spark.sql.exchange.reuse配置为true,且存在join的是broadcastHashjoin,而且计算结果和要进行过滤的物理计划的结果一样,则进行下一步,
- 进行物理计划执行前的准备, 得到executedPlan
- 构建BroadcastExchangeExec,broadcastValues,InSubqueryExec,DynamicPruningExpression,BroadcastExchangeExec内部就是进行spark的broadcast操作
注意:这里的BroadcastExchangeExec会在ReuseExchange规则中被优化, 最终会被BroadcastQueryStageExec调用,从而公用同一个broacast的值
- 如果以上不满足,默认DynamicPruningExpression(Literal.TrueLiteral),也就是不会进行裁剪
- 如果不是broadcastHashjoin,但是能够加速,则按照需要过滤的key做一次聚合,之后再组成DynamicPruningExpression
至此动态裁剪的物理计划优化就分析完了