Spark 存储行动算子源码解析
2022-02-09 本文已影响0人
Tim在路上
- saveAsHadoopFile
输出RDD到任何支持Hadoop的文件系统
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
// 1. 配置hadoopConf 将key,value,output类型进行设置
val hadoopConf = conf
hadoopConf.setOutputKeyClass(keyClass)
hadoopConf.setOutputValueClass(valueClass)
conf.setOutputFormat(outputFormatClass)
// 配置压缩
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConf.setMapOutputCompressorClass(c)
hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)
hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",
CompressionType.BLOCK.toString)
}
// 配置output的committer
// Use configured output committer if already set
if (conf.getOutputCommitter == null) {
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
}
// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use an output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}
FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriterUtils.createPathFromString(path, hadoopConf))
// 调用saveAsHadoopDataset
saveAsHadoopDataset(hadoopConf)
}
从源码可以看出saveAsHadoopFile的输入参数有path, key类型,value类型, 输出格式类型,hadoop配置,压缩类型。将输入的参数配置到JobConf中后,调用saveAsHadoopDataset。
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
val config = new HadoopMapRedWriteConfigUtil[K, V](new SerializableJobConf(conf))
SparkHadoopWriter.write(
rdd = self,
config = config)
}
源码调用了SparkHadoopWriter.write方法。
def write[K, V: ClassTag](
rdd: RDD[(K, V)],
config: HadoopWriteConfigUtil[K, V]): Unit = {
// Extract context and configuration from RDD.
val sparkContext = rdd.context
val commitJobId = rdd.id
// Set up a job. 准备和创建一个commiter Job
val jobTrackerId =createJobTrackerID(new Date())
val jobContext = config.createJobContext(jobTrackerId, commitJobId)
config.initOutputFormat(jobContext)
// Assert the output format/key/value class is set in JobConf.
config.assertConf(jobContext, rdd.conf)
val committer = config.createCommitter(commitJobId)
committer.setupJob(jobContext)
// Try to write all RDD partitions as a Hadoop OutputFormat.
try {
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
// SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.
// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber
提交到Task端执行
executeTask(
context = context,
config = config,
jobTrackerId = jobTrackerId,
commitJobId = commitJobId,
sparkPartitionId = context.partitionId,
sparkAttemptNumber = attemptId,
committer = committer,
iterator = iter)
})
// 提交job
committer.commitJob(jobContext, ret)
logInfo(s"Job${jobContext.getJobID} committed.")
} catch {
case cause: Throwable =>
logError(s"Aborting job${jobContext.getJobID}.", cause)
committer.abortJob(jobContext)
throw new SparkException("Job aborted.", cause)
}
}
其主要工作为,在Driver端为作业准备数据源和Hadoop的配置,提交一个Job, 并向RDD的每一个分区传入executeTask作为执行,其任务将每一个分区中的所有行进行写出。如果所有的分区task都成功写出,提交commitTask,则提交committer, 否则存在失败则终止。
saveAsHadoopFile 还存在一些简化版本,参数的传递时通过程序自己获取。
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String,
codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope {
val runtimeClass = fm.runtimeClass
saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
}
从中可以看出我们只需传入存储路径。
private[spark] def keyClass: Class[_] = kt.runtimeClass
private[spark] def valueClass: Class[_] = vt.runtimeClass
keyClass和valueClass都是运行时转换获取。
- saveAsTextFile
将RDD存储的支持hadoop系统上的文本文件,以string形式存储,它也是saveAsHadoopFile的简化版。
def saveAsTextFile(path: String): Unit = withScope {
// same bytecodes for `saveAsTextFile`.
val nullWritableClassTag =implicitly[ClassTag[NullWritable]]
val textClassTag =implicitly[ClassTag[Text]]
val r = this.mapPartitions { iter =>
val text = new Text()
iter.map { x =>
text.set(x.toString)
(NullWritable.get(), text)
}
}
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
- 其他大数据系统
从上文可以看出,在调用saveAsHadoopDataset时,传入的参数为Jobconf类型,实质是在其中配置相关类型,通过配置JobConf也可以实现其他系统的存储。
object HBaseWriteTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val tableName = "XXX"
val quorum = "localhost"
val port = "2181"
// 配置相关信息
val conf = HBaseUtils.getHBaseConfiguration(quorum,port,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
val jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
// 写入数据到HBase
val indataRDD = sc.makeRDD(Array("20180723_02,10","20180723_03,10","20180818_03,50"))
val rdd = indataRDD.map(_.split(",")).map{arr => {
val put = new Put(Bytes.toBytes(arr(0)))
put.add(Bytes.toBytes("info"),Bytes.toBytes("clict_count"),Bytes.toBytes(arr(1)))
(new ImmutableBytesWritable,put)
}}.saveAsHadoopDataset(jobConf)
sc.stop()
}
}