【spark系列10】spark logicalPlan Sta
背景
本文版本是spark 3.0.1
分析
逻辑阶段的统计信息,对于逻辑阶段的优化也是很重要的,比如broadcathashJoin,dynamic partitions pruning,本文分析一下spark 是怎么获取stastatics信息的
直接到LogicalPlanStats:
trait LogicalPlanStats { self: LogicalPlan =>
/**
* Returns the estimated statistics for the current logical plan node. Under the hood, this
* method caches the return value, which is computed based on the configuration passed in the
* first time. If the configuration changes, the cache can be invalidated by calling
* [[invalidateStatsCache()]].
*/
def stats: Statistics = statsCache.getOrElse {
if (conf.cboEnabled) {
statsCache = Option(BasicStatsPlanVisitor.visit(self))
} else {
statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
}
statsCache.get
}
/** A cache for the estimated statistics, such that it will only be computed once. */
protected var statsCache: Option[Statistics] = None
/** Invalidates the stats cache. See [[stats]] for more information. */
final def invalidateStatsCache(): Unit = {
statsCache = None
children.foreach(_.invalidateStatsCache())
}
}
该stats方法用来计算statistics,如果开启了cbo,则用BasicStatsPlanVisitor的visit,否则调用SizeInBytesOnlyStatsPlanVisitor的visit方法。我们可以看一下SizeInBytesOnlyStatsPlanVisitor.visit方法,因为BasicStatsPlanVisitor的很多方法都是调用SizeInBytesOnlyStatsPlanVisitor方法。而我们可以重点看一下default方法:
override def default(p: LogicalPlan): Statistics = p match {
case p: LeafNode => p.computeStats()
case _: LogicalPlan => Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).product)
}
因为统计信息都是一层一层从叶子节点往上传递的,当匹配到叶子节点的时候,则直接调用该computeStats方法,对于不同版本的dataSource是有区别的:
- 对于v1版本的,拿hiveTableRelation举例:
override def computeStats(): Statistics = {
tableMeta.stats.map(_.toPlanStats(output, conf.cboEnabled || conf.planStatsEnabled))
.orElse(tableStats)
.getOrElse {
throw new IllegalStateException("table stats must be specified.")
}
}
直接从元数据中获取信息,如果开启了cbo或者planstats,则还会获取行信息和列的统计信息
- 对于v2版本的, 拿DataSourceV2Relation举例:
override def computeStats(): Statistics = {
if (Utils.isTesting) {
// when testing, throw an exception if this computeStats method is called because stats should
// not be accessed before pushing the projection and filters to create a scan. otherwise, the
// stats are not accurate because they are based on a full table scan of all columns.
throw new IllegalStateException(
s"BUG: computeStats called before pushdown on DSv2 relation: $name")
} else {
// when not testing, return stats because bad stats are better than failing a query
table.asReadable.newScanBuilder(options) match {
case r: SupportsReportStatistics =>
val statistics = r.estimateStatistics()
DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes)
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
}
直接调用table.newScanBuilder.如果继承了SupportsReportStatistics,则调用该estimateStatistics方法,这里涉及到的Table SupportsRead SupportsReportStatistics 都是spark 3引入的新类,我们直接看ParquetScan,默认是继承FileScan的estimateStatistics方法:
override def estimateStatistics(): Statistics = {
new Statistics {
override def sizeInBytes(): OptionalLong = {
val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor
val size = (compressionFactor * fileIndex.sizeInBytes).toLong
OptionalLong.of(size)
}
override def numRows(): OptionalLong = OptionalLong.empty()
}
}
其实可以看出v2版本的没有列统计信息,至少目前是没有,而v1版本的部分是有列统计信息的, 毕竟统计每一列的信息是耗时的.