spark

spark(六)深入理解spark-core:RDD的原理与源码

2018-09-16  本文已影响100人  文子轩

一.弹性分布式数据集(RDD)

本部分描述RDD和编程模型,首先讨论设计目标,然后定义RDD,讨论Spark的编程模型,并给出一个示例,最后对比RDD与分布式共享内存

1.RDD拥有的优势特性:自动容错、位置感知性调度和可伸缩性。
RDD比数据流模型更易于编程,同时基于工作集的计算也具有良好的描述能力
2.分布式的数据集的容错性有两种方式:即数据检查点和记录数据的更新。
我们面向的是大规模数据分析,数据检查点操作成本很高.
3.RDD只支持粗粒度转换,即在大量记录上执行的单个操作。
将创建RDD的一系列转换记录下来(即Lineage),以便恢复丢失的分区。
4.但我们发现RDD仍然可以很好地适用于很多应用,特别是支持数据并行的批量分析应用,
包括数据挖掘、机器学习、图算法等,因为这些程序通常都会在很多记录上执行相同的操作

RDD是只读的,分区一记录的集合.RDD只能基于在稳定的物理存储中的数据集和其他已有的RDD上执行确定性操作来创建.这些确定的操作称为转换,如map,filter,groupby,join(转换不是程序开发人员在RDD上执行的操作注:这句话的意思可能是,转换操作并不会触发RDD真正的action。由于惰性执行,当进行action操作的时候,才会回溯去执行前面的转换操作))

1.定义RDD之后,程序员就可以在动作(注:即action操作)中使用RDD了。动作是向应用程序返回值,或向存储系统导出数据的那些操作,例如,count(返回RDD中的元素个数),collect(返回元素本身),save(将RDD输出到存储系统)。在Spark中,只有在动作第一次使用RDD时,才会计算RDD(即延迟计算)。这样在构建RDD的时候,运行时通过管道的方式传输多个转换。

2.程序员还可以从两个方面控制RDD,即缓存和分区,用户可以请求将RDD缓存,这样运行的时候将已经计算好的RDD分区存储起来,以加速后期的重用缓存的RDD一般存储在内存中,但如果内存不够,可以写在磁盘上

  1. 另一方面,RDD还可以用户根据关键字(key)指定分区的顺序,例如reducebykey的消息,这是一个可选的功能,目前支持哈希分区和范围分区,例如,应用程序请求将两个RDD按照同样的哈希分区的方式进行分区(将同一机器尚具有相同的关键字记录放在一个分区中),以加速她们之间的JOIN操作,
photo

本部分我们通过一个具体示例来阐述RDD。假定有一个大型网站出错,操作员想要检查Hadoop文件系统(HDFS)中的日志文件(TB级大小)来找出原因。通过使用Spark,操作员只需将日志中的错误信息装载到一组节点的内存中,然后执行交互式查询。首先,需要在Spark解释器中输入如下Scala代码:

lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))
errors.cache()

第1行从HDFS文件定义了一个RDD(即一个文本行集合),第2行获得一个过滤后的RDD,第3行请求将errors缓存起来。注意在Scala语法中filter的参数是一个闭包(什么是闭包?https://zhuanlan.zhihu.com/p/21346046)。

这时集群还没有开始执行任何任务。但是,用户已经可以在这个RDD上执行对应的动作,例如统计错误消息的数目:

errors.count()

用户还可以在RDD尚执行更多的转换操作,并使用转换结果,如:

// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
    .map(_.split('\t')(3))
    .collect()

使用errors的第一个action运行以后,Spark会把errors的分区缓存在内存中,极大地加快了后续计算速度。注意,最初的RDD lines不会被缓存。因为错误信息可能只占原数据集的很小一部分(小到足以放入内存)。

最后,为了说明模型的容错性,图1给出了第3个查询的Lineage图。在lines RDD上执行filter操作,得到errors,然后再filter、map后得到新的RDD,在这个RDD上执行collect操作。Spark调度器以流水线的方式执行后两个转换,向拥有errors分区缓存的节点发送一组任务。此外,如果某个errors分区丢失,Spark只在相应的lines分区上执行filter操作来重建该errors分区。

photo2

为了进一步理解RDD是一种分布式的内存抽象,表1列出了RDD与分布式共享内存(DSM,Distributed Shared Memory)[24]的对比。在DSM系统中,应用可以向全局地址空间的任意位置进行读写操作。(注意这里的DSM,不仅指传统的共享内存系统,还包括那些通过分布式哈希表或分布式文件系统进行数据共享的系统,比如Piccolo[28](注:Spark生态系统中有一名为Alluxio的分布式内存文件系统,它通常可作为Spark和HDFS的中间层存在 ))DSM是一种通用的抽象,但这种通用性同时也使得在商用集群上实现有效的容错性更加困难。

RDD与DSM主要区别在于,不仅可以通过批量转换创建(即“写”)RDD,还可以对任意内存位置读写。也就是说,RDD限制应用执行批量写操作,这样有利于实现有效的容错。特别地,RDD没有检查点开销,因为可以使用Lineage来恢复RDD。而且,失效时只需要重新计算丢失的那些RDD分区,可以在不同节点上并行执行,而不需要回滚整个程序。

图片.png

第一,对于RDD中的批量操作,运行时将根据数据存放的位置来调度任务,从而提高性能。第二,对于基于扫描的操作,如果内存不足以缓存整个RDD,就进行部分缓存。把内存放不下的分区存储到磁盘上,此时性能与现有的数据流系统差不多。

最后看一下读操作的粒度。RDD上的很多动作(如count和collect)都是批量读操作,即扫描整个数据集,可以将任务分配到距离数据最近的节点上。同时,RDD也支持细粒度操作,即在哈希或范围分区的RDD上执行关键字查找。

二.spark编程接口

要使用Spark,开发者需要编写一个driver程序,连接到集群以运行Worker。Driver定义了一个或多个RDD,并调用RDD上的动作。Worker是长时间运行的进程,将RDD分区以Java对象的形式缓存在内存中。

photo3

用户执行RDD操作时会提供参数,比如map传递一个闭包(closure,函数式编程中的概念)。Scala将闭包表示为Java对象,如果传递的参数是闭包,则这些对象被序列化,通过网络传输到其他节点上进行装载。Scala将闭包内的变量保存为Java对象的字段。例如,var x = 5; rdd.map(_ + x) 这段代码将RDD中的每个元素加5。总的来说,Spark的语言集成类似于DryadLINQ。

RDD本身是静态类型对象,由参数指定其元素类型。例如,RDD[int]是一个整型RDD。不过,我们举的例子几乎都省略了这个类型参数,因为Scala支持类型推断。

虽然在概念上使用Scala实现RDD很简单,但还是要处理一些Scala闭包对象的反射问题。如何通过Scala解释器来使用Spark还需要更多工作,这点我们将在第6部分讨论。不管怎样,我们都不需要修改Scala编译器。

注意,有些操作只对键值对可用,比如join。另外,函数名与Scala及其他函数式语言中的API匹配,例如map是一对一的映射,而flatMap是将每个输入映射为一个或多个输出(与MapReduce中的map类似)。

除了这些操作以外,用户还可以请求将RDD缓存起来。而且,用户还可以通过Partitioner类获取RDD的分区顺序,然后将另一个RDD按照同样的方式分区。有些操作会自动产生一个哈希或范围分区的RDD,像groupByKey,reduceByKey和sort等

很多机器学习算法都具有迭代特性,运行迭代优化方法来优化某个目标函数,例如梯度下降方法。如果这些算法的工作集能够放入内存,将极大地加速程序运行。而且,这些算法通常采用批量操作,例如映射和求和,这样更容易使用RDD来表示。

例如下面的程序是逻辑回归[15]的实现。逻辑回归是一种常见的分类算法,即寻找一个最佳分割两组点(即垃圾邮件和非垃圾邮件)的超平面w。算法采用梯度下降的方法:开始时w为随机值,在每一次迭代的过程中,对w的函数求和,然后朝着优化的方向移动w。

val points = spark.textFile(...)
     .map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
     val gradient = points.map{ p =>
          p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
     }.reduce((a,b) => a+b)
     w -= gradient
}

已经在Spark中实现的迭代式机器学习算法还有:kmeans(像逻辑回归一样每次迭代时执行一对map和reduce操作),期望最大化算法(EM,两个不同的map/reduce步骤交替执行),交替最小二乘矩阵分解和协同过滤算法。Chu等人提出迭代式MapReduce也可以用来实现常用的学习算法。

窄依赖的每个child的RDD的paratition的生车费嗯操作是可以并行,而宽依赖则需要所有的parent partition shuffle结果得到后再进行
下面我们来看下
抽象类Dependency:

abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

Dependency有两个子类,一个子类为窄依赖,NarrowDenpendency;一个为宽依赖shuffleDependency
NarrowDependency也是一个抽象类,它实现了getParents 重写了 rdd 函数,它有两个子类,一个是 OneToOneDependency,一个是 RangeDependency
*

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

OneToOneDependency,可以看到getParents实现很简单,就是传进一个partitionId: Int,再把partitionId放在List里面传出去,即去parent RDD 中取与该RDD 相同 partitionID的数据

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

*** RangeDependency,用于union。与上面不同的是,这里我们要算出该位置,设某个parent RDD 从 inStart 开始的partition,逐个生成了 child RDD 从outStart 开始的partition,则计算方式为: partitionId - outStart + inStart***

class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

三.检查点

一般来说,Lineage链较长、宽依赖的RDD需要采用检查点机制。这种情况下,集群的节点故障可能导致每个父RDD的数据块丢失,因此需要全部重新计算[20]。将窄依赖的RDD数据存到物理存储中可以实现优化,例如前面4.1小节逻辑回归的例子,将数据点和不变的顶点状态存储起来,就不再需要检查点操作。

***(注:
我们来阅读下org.apache.spark.rdd.ReliableCheckpointRDD中的def writePartitionToCheckpointFile 和 def writeRDDToCheckpointDirectory:

writePartitionToCheckpointFile:把RDD一个Partition文件里面的数据写到一个Checkpoint文件里面 ***

  def writePartitionToCheckpointFile[T: ClassTag](
      path: String,
      broadcastedConf: Broadcast[SerializableConfiguration],
      blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
    val env = SparkEnv.get
    //获取Checkpoint文件输出路径
    val outputDir = new Path(path)
    val fs = outputDir.getFileSystem(broadcastedConf.value.value)

    //根据partitionId 生成 checkpointFileName
    val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())
    //拼接路径与文件名
    val finalOutputPath = new Path(outputDir, finalOutputName)
    //生成临时输出路径
    val tempOutputPath =
      new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")

    if (fs.exists(tempOutputPath)) {
      throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
    }
    //得到块大小,默认为64MB
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
    //得到文件输出流
    val fileOutputStream = if (blockSize < 0) {
      fs.create(tempOutputPath, false, bufferSize)
    } else {
      // This is mainly for testing purpose
      fs.create(tempOutputPath, false, bufferSize,
        fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
    }
    //序列化文件输出流
    val serializer = env.serializer.newInstance()
    val serializeStream = serializer.serializeStream(fileOutputStream)
    Utils.tryWithSafeFinally {
    //写数据
      serializeStream.writeAll(iterator)
    } {
      serializeStream.close()
    }

    if (!fs.rename(tempOutputPath, finalOutputPath)) {
      if (!fs.exists(finalOutputPath)) {
        logInfo(s"Deleting tempOutputPath $tempOutputPath")
        fs.delete(tempOutputPath, false)
        throw new IOException("Checkpoint failed: failed to save output of task: " +
          s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")
      } else {
        // Some other copy of this task must've finished before us and renamed it
        logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
        if (!fs.delete(tempOutputPath, false)) {
          logWarning(s"Error deleting ${tempOutputPath}")
        }
      }
    }
  }
上一篇下一篇

猜你喜欢

热点阅读