【spark系列8】spark delta读数据实现分析
背景
本文基于delta 0.7.0
spark 3.0.1
我们之前的spark delta写操作ACID事务前传--写文件基础类FileFormat/FileCommitProtocol分析,spark delta写操作ACID事务实现分析分析了delta写数据的流程,这次我们分析一下delta是怎么读取数据的。
分析
spark 的delta datasource的构建要从DataSource.lookupDataSourceV2开始,之后会流向到loadV1Source,这里会进行dataSource.createRelation进行构建datasource的Relation的构建,直接转到deltaDataSource 的createRelation:
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val maybePath = parameters.getOrElse("path", {
throw DeltaErrors.pathNotSpecifiedException
})
// Log any invalid options that are being passed in
DeltaOptions.verifyOptions(CaseInsensitiveMap(parameters))
val timeTravelByParams = DeltaDataSource.getTimeTravelVersion(parameters)
DeltaTableV2(
sqlContext.sparkSession,
new Path(maybePath),
timeTravelOpt = timeTravelByParams).toBaseRelation
}
- DeltaOptions.verifyOptions进行参数校验,有效的参数如下:
val validOptionKeys : Set[String] = Set(
REPLACE_WHERE_OPTION,
MERGE_SCHEMA_OPTION,
EXCLUDE_REGEX_OPTION,
OVERWRITE_SCHEMA_OPTION,
USER_METADATA_OPTION,
MAX_FILES_PER_TRIGGER_OPTION,
IGNORE_FILE_DELETION_OPTION,
IGNORE_CHANGES_OPTION,
IGNORE_DELETES_OPTION,
OPTIMIZE_WRITE_OPTION,
DATA_CHANGE_OPTION,
"queryName",
"checkpointLocation",
"path",
"timestampAsOf",
"versionAsOf"
)
- DeltaDataSource.getTimeTravelVersion根据指定的timestampAsOf或者versionAsOf获取指定的版本
- 直接调用DeltaTableV2的toBaseRelation方法:
def toBaseRelation: BaseRelation = {
if (deltaLog.snapshot.version == -1) {
val id = catalogTable.map(ct => DeltaTableIdentifier(table = Some(ct.identifier)))
.getOrElse(DeltaTableIdentifier(path = Some(path.toString)))
throw DeltaErrors.notADeltaTableException(id)
}
val partitionPredicates = DeltaDataSource.verifyAndCreatePartitionFilters(
path.toString, deltaLog.snapshot, partitionFilters)
// TODO(burak): We should pass in the snapshot here
deltaLog.createRelation(partitionPredicates, timeTravelSpec)
}
-
如果存在分区,则DeltaDataSource.verifyAndCreatePartitionFilter创建partitionPredicates
-
timeTravelSpec,这里优先选择用户指定的timeTravelByParams,否则通过DeltaDataSource.parsePathIdentifier选择path指定的version,格式如:/some/path/partition=1@v1234 或者/some/path/partition=1@yyyyMMddHHmmssSSS
-
直接调用deltaLog.createRelation:
def createRelation( partitionFilters: Seq[Expression] = Nil, timeTravel: Option[DeltaTimeTravelSpec] = None): BaseRelation = { val versionToUse = timeTravel.map { tt => val (version, accessType) = DeltaTableUtils.resolveTimeTravelVersion( spark.sessionState.conf, this, tt) val source = tt.creationSource.getOrElse("unknown") recordDeltaEvent(this, s"delta.timeTravel.$source", data = Map( "tableVersion" -> snapshot.version, "queriedVersion" -> version, "accessType" -> accessType )) version } /** Used to link the files present in the table into the query planner. */ val snapshotToUse = versionToUse.map(getSnapshotAt(_)).getOrElse(snapshot) val fileIndex = TahoeLogFileIndex( spark, this, dataPath, snapshotToUse.metadata.schema, partitionFilters, versionToUse) new HadoopFsRelation( fileIndex, partitionSchema = snapshotToUse.metadata.partitionSchema, dataSchema = snapshotToUse.metadata.schema, bucketSpec = None, snapshotToUse.fileFormat, snapshotToUse.metadata.format.options)(spark) with InsertableRelation { def insert(data: DataFrame, overwrite: Boolean): Unit = { val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append WriteIntoDelta( deltaLog = DeltaLog.this, mode = mode, new DeltaOptions(Map.empty[String, String], spark.sessionState.conf), partitionColumns = Seq.empty, configuration = Map.empty, data = data).run(spark) } }
- . 通过指定版本获取对应的snapshot
- . 构建TahoeLogFileIndex,因为这里构建的是HadoopFsRelation,所以我们关注TahoeLogFileIndex的inputfiles方法:
override def inputFiles: Array[String] = { getSnapshot(stalenessAcceptable = false).filesForScan( projection = Nil, partitionFilters).files.map(f => absolutePath(f.path).toString).toArray }
该方法调用了snapshot的filesForScan方法:
def filesForScan(projection: Seq[Attribute], filters: Seq[Expression]): DeltaScan = { implicit val enc = SingleAction.addFileEncoder val partitionFilters = filters.flatMap { filter => DeltaTableUtils.splitMetadataAndDataPredicates(filter, metadata.partitionColumns, spark)._1 } val files = DeltaLog.filterFileList( metadata.partitionSchema, allFiles.toDF(), partitionFilters).as[AddFile].collect() DeltaScan(version = version, files, null, null, null)(null, null, null, null) }
通过之前文章的分析,我们直到deltalog记录了AddFile和Remove记录,那现在读数据怎么读取呢?全部在allFiles方法。
重点看一下:allFiles方法:
def allFiles: Dataset[AddFile] = {
val implicits = spark.implicits
import implicits._
state.where("add IS NOT NULL").select($"add".as[AddFile])
}
这里调用了state方法,而它又调用了stateReconstruction方法,
private lazy val cachedState =
cacheDS(stateReconstruction, s"Delta Table State #$version - $redactedPath")
/** The current set of actions in this [[Snapshot]]. */
def state: Dataset[SingleAction] = cachedState.getDS
stateReconstruction方法在checkpoint的时用到了,在这里也用到了,主要是重新构造文件状态,合并AddFile和RemoveFile:
private def stateReconstruction: Dataset[SingleAction] = {
...
loadActions.mapPartitions { actions =>
val hdpConf = hadoopConf.value.value
actions.flatMap {
_.unwrap match {
case add: AddFile => Some(add.copy(path = canonicalizePath(add.path, hdpConf)).wrap)
case rm: RemoveFile => Some(rm.copy(path = canonicalizePath(rm.path, hdpConf)).wrap)
case other if other == null => None
case other => Some(other.wrap)
}
}
}
...
.mapPartitions { iter =>
val state = new InMemoryLogReplay(time)
state.append(0, iter.map(_.unwrap))
state.checkpoint.map(_.wrap)
}
}
而关键在于InMemoryLogReplay的append方法和checkpoint方法,这里做到了文件状态的合并:
assert(currentVersion == -1 || version == currentVersion + 1,
s"Attempted to replay version $version, but state is at $currentVersion")
currentVersion = version
actions.foreach {
case a: SetTransaction =>
transactions(a.appId) = a
case a: Metadata =>
currentMetaData = a
case a: Protocol =>
currentProtocolVersion = a
case add: AddFile =>
activeFiles(add.pathAsUri) = add.copy(dataChange = false)
// Remove the tombstone to make sure we only output one `FileAction`.
tombstones.remove(add.pathAsUri)
case remove: RemoveFile =>
activeFiles.remove(remove.pathAsUri)
tombstones(remove.pathAsUri) = remove.copy(dataChange = false)
case ci: CommitInfo => // do nothing
case null => // Some crazy future feature. Ignore
}
}
重点就在case add: AddFile和 case remove: RemoveFile处理以及checkpoint方法,能够很好的合并文件状态。
再调用collect方法,返回DeltaScan,之后获取文件路径作为要处理的文件路径。
- 把TahoeLogFileIndex传入HadoopFsRelation得到最后的BaseRelation 返回
注意:spark读取delta格式整个流程和spark读取其他数据格式流程一致,主要区别在于读取数据之前,会把文件状态在内存中进行一次合并,这样只需要读取文件状态为Addfile的就行了