Spark 存储行动算子源码解析

2022-02-09  本文已影响0人  Tim在路上

输出RDD到任何支持Hadoop的文件系统

def saveAsHadoopFile(
    path: String,
    keyClass: Class[_],
    valueClass: Class[_],
    outputFormatClass: Class[_ <: OutputFormat[_, _]],
    conf: JobConf = new JobConf(self.context.hadoopConfiguration),
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
  // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
  // 1. 配置hadoopConf 将key,value,output类型进行设置
  val hadoopConf = conf
  hadoopConf.setOutputKeyClass(keyClass)
  hadoopConf.setOutputValueClass(valueClass)
  conf.setOutputFormat(outputFormatClass)
  // 配置压缩
  for (c <- codec) {
    hadoopConf.setCompressMapOutput(true)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
    hadoopConf.setMapOutputCompressorClass(c)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",
      CompressionType.BLOCK.toString)
  }
  // 配置output的committer
  // Use configured output committer if already set
  if (conf.getOutputCommitter == null) {
    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
  }

  // When speculation is on and output committer class name contains "Direct", we should warn
  // users that they may loss data if they are using a direct output committer.
  val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
  val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
  if (speculationEnabled && outputCommitterClass.contains("Direct")) {
    val warningMessage =
      s"$outputCommitterClass may be an output committer that writes data directly to " +
        "the final location. Because speculation is enabled, this output committer may " +
        "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
        "committer that does not have this behavior (e.g. FileOutputCommitter)."
    logWarning(warningMessage)
  }

  FileOutputFormat.setOutputPath(hadoopConf,
    SparkHadoopWriterUtils.createPathFromString(path, hadoopConf))
  // 调用saveAsHadoopDataset
  saveAsHadoopDataset(hadoopConf)
}

从源码可以看出saveAsHadoopFile的输入参数有path, key类型,value类型, 输出格式类型,hadoop配置,压缩类型。将输入的参数配置到JobConf中后,调用saveAsHadoopDataset。

def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
  val config = new HadoopMapRedWriteConfigUtil[K, V](new SerializableJobConf(conf))
  SparkHadoopWriter.write(
    rdd = self,
    config = config)
}

源码调用了SparkHadoopWriter.write方法。

def write[K, V: ClassTag](
    rdd: RDD[(K, V)],
    config: HadoopWriteConfigUtil[K, V]): Unit = {
  // Extract context and configuration from RDD.
  val sparkContext = rdd.context
  val commitJobId = rdd.id

// Set up a job.  准备和创建一个commiter Job
  val jobTrackerId =createJobTrackerID(new Date())
  val jobContext = config.createJobContext(jobTrackerId, commitJobId)
  config.initOutputFormat(jobContext)

  // Assert the output format/key/value class is set in JobConf.
  config.assertConf(jobContext, rdd.conf)

  val committer = config.createCommitter(commitJobId)
  committer.setupJob(jobContext)

  // Try to write all RDD partitions as a Hadoop OutputFormat.
  try {
    val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
      // SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.
      // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
      val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber
提交到Task端执行
executeTask(
        context = context,
        config = config,
        jobTrackerId = jobTrackerId,
        commitJobId = commitJobId,
        sparkPartitionId = context.partitionId,
        sparkAttemptNumber = attemptId,
        committer = committer,
        iterator = iter)
    })
    // 提交job
    committer.commitJob(jobContext, ret)
    logInfo(s"Job${jobContext.getJobID} committed.")
  } catch {
    case cause: Throwable =>
      logError(s"Aborting job${jobContext.getJobID}.", cause)
      committer.abortJob(jobContext)
      throw new SparkException("Job aborted.", cause)
  }
}

其主要工作为,在Driver端为作业准备数据源和Hadoop的配置,提交一个Job, 并向RDD的每一个分区传入executeTask作为执行,其任务将每一个分区中的所有行进行写出。如果所有的分区task都成功写出,提交commitTask,则提交committer, 否则存在失败则终止。

saveAsHadoopFile 还存在一些简化版本,参数的传递时通过程序自己获取。

def saveAsHadoopFile[F <: OutputFormat[K, V]](
    path: String,
    codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope {
  val runtimeClass = fm.runtimeClass
  saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
}

从中可以看出我们只需传入存储路径。

private[spark] def keyClass: Class[_] = kt.runtimeClass

private[spark] def valueClass: Class[_] = vt.runtimeClass

keyClass和valueClass都是运行时转换获取。

将RDD存储的支持hadoop系统上的文本文件,以string形式存储,它也是saveAsHadoopFile的简化版。

def saveAsTextFile(path: String): Unit = withScope {
  // same bytecodes for `saveAsTextFile`.
  val nullWritableClassTag =implicitly[ClassTag[NullWritable]]
  val textClassTag =implicitly[ClassTag[Text]]
  val r = this.mapPartitions { iter =>
    val text = new Text()
    iter.map { x =>
      text.set(x.toString)
      (NullWritable.get(), text)
    }
  }
  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}

从上文可以看出,在调用saveAsHadoopDataset时,传入的参数为Jobconf类型,实质是在其中配置相关类型,通过配置JobConf也可以实现其他系统的存储。

object HBaseWriteTest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)

    val tableName = "XXX"
    val quorum = "localhost"
    val port = "2181"

    // 配置相关信息
    val conf = HBaseUtils.getHBaseConfiguration(quorum,port,tableName)
    conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)

    val jobConf = new JobConf()
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,tableName)

    // 写入数据到HBase
    val indataRDD = sc.makeRDD(Array("20180723_02,10","20180723_03,10","20180818_03,50"))

    val rdd = indataRDD.map(_.split(",")).map{arr => {
      val put = new Put(Bytes.toBytes(arr(0)))
      put.add(Bytes.toBytes("info"),Bytes.toBytes("clict_count"),Bytes.toBytes(arr(1)))
      (new ImmutableBytesWritable,put)
    }}.saveAsHadoopDataset(jobConf)

    sc.stop()
  }
}
上一篇下一篇

猜你喜欢

热点阅读