大数据领域精选

【Spark 精选】EliminateOuterJoin 优化规

2023-10-09  本文已影响0人  熊本极客

1.EliminateOuterJoin 优化规则的应用场景

问题:为啥需要消除外链接即 out join
解答:消除 out join 可以提高执行效率。因为 inner join 只保留左表和右表可以关联到的数据,left join 需要保留左表全表的数据,right join 需要保留右表全表的数据,full join 左右表数据都需要保留,所以四种 join 在数据处理上的效率:inner join > left join = right join > full join

2.EliminateOuterJoin 源码解析

  private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
    val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
    val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
    val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))

    lazy val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
    lazy val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)

    join.joinType match {
      case RightOuter if leftHasNonNullPredicate => Inner   // 1.right outer类型,且join的左表有过滤操作,则转换为inner类型
      case LeftOuter if rightHasNonNullPredicate => Inner   // 2.left outer类型,且join的右表有过滤操作,则转换为inner类型
      case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner   // 3.full outer类型,且join的左右表都有过滤操作,则转换为inner类型
      case FullOuter if leftHasNonNullPredicate => LeftOuter   // 4.full outer类型,且join的左表有过滤操作,则转换为left outer类型
      case FullOuter if rightHasNonNullPredicate => RightOuter   // 5.full outer类型,且join的右表有过滤操作,则转换为right outer类型
      case o => o
    }
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    // 匹配上Filter并且其子节点为Join的LogicalPlan
    case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) =>
      val newJoinType = buildNewJoinType(f, j)
      // 如果相等,则不符合优化条件
      if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))   // 如果不相等,则改变 JoinType
  }
}
EliminateOuterJoin执行流程.JPG

问题 1:为啥 left out join 的右边有过滤条件,则转换为 inner?
解答left join 的特点是右表没有对应的数据时补 null。如下所示,现在右表有个条件 a<1,这说明右表为 null 都会被 a<1 给过滤掉,此时和 inner join 是等价的。

spark-sql> explain extended SELECT* FROM employees LEFT JOIN departments ON employees.dept_id = departments.dept_id where departments.dept_id < 200;

== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('departments.dept_id < 200)
   +- 'Join LeftOuter, ('employees.dept_id = 'departments.dept_id)
      :- 'UnresolvedRelation [employees], [], false
      +- 'UnresolvedRelation [departments], [], false

== Analyzed Logical Plan ==
emp_id: int, emp_name: string, dept_id: int, dept_id: int, dept_name: string, location_id: int
Project [emp_id#102, emp_name#103, dept_id#104, dept_id#105, dept_name#106, location_id#107]
+- Filter (dept_id#105 < 200)
   +- Join LeftOuter, (dept_id#104 = dept_id#105)
      :- SubqueryAlias spark_catalog.tpcds_text_varchar_5.employees
      :  +- HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
      +- SubqueryAlias spark_catalog.tpcds_text_varchar_5.departments
         +- HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]

== Optimized Logical Plan ==
Join Inner, (dept_id#104 = dept_id#105)
:- Filter ((dept_id#104 < 200) AND isnotnull(dept_id#104))
:  +- HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
+- Filter (isnotnull(dept_id#105) AND (dept_id#105 < 200))
   +- HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]

== Physical Plan ==
*(2) BroadcastHashJoin [dept_id#104], [dept_id#105], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)),false), [id=#138]
:  +- *(1) Filter ((dept_id#104 < 200) AND isnotnull(dept_id#104))
:     +- Scan hive tpcds_text_varchar_5.employees [emp_id#102, emp_name#103, dept_id#104], HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
+- *(2) Filter (isnotnull(dept_id#105) AND (dept_id#105 < 200))
   +- Scan hive tpcds_text_varchar_5.departments [dept_id#105, dept_name#106, location_id#107], HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]

问题 2:为啥规则 EliminateOuterJoin 需要在谓语下推 PushDownPredicates 之前执行?

  // Optimizer 
  def defaultBatches: Seq[Batch] = {
    val operatorOptimizationRuleSet =
      Seq(
        // Operator push down
        PushProjectionThroughUnion,
        ReorderJoin,
        EliminateOuterJoin,   // 消除外链接
        PushDownPredicates,   // 谓语下推
        // 省略...
  }      

解答:谓词下推是指尽量将过滤条件更贴近数据源,使得查询过程可以跳过无关的数据。因为 EliminateOuterJoin 需要根据原始过滤条件的位置,进行 out join 转换,如果先执行谓语下推 PushDownPredicates,会影响前者,所以消除外链接的规则需要在谓语下推规则之前执行。

上一篇下一篇

猜你喜欢

热点阅读