大数据,机器学习,人工智能Spark学习之路

Spark简明笔记

2018-11-24  本文已影响0人  西北偏北

一、Spark结构

1542185452899.png

二、如何部署Spark程序

以scala为例,我们通过IDE编写Spark应用后,将其打包成jar包,然后使用spark-submit程序进行部署

 ./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

2.1 上述master参数可配置的值如下

1542188212665.png

2.2 Spark配置优先级

优先级从高到低依次是:

三、RDD

RDD是一个统一分布式数据抽象数据集。其下对应实际的数据存储介质,可能是文件,也可以是hadoop。通过RDD可以进行tranformation和action操作,从而实现分布式计算。

3.1 关键数据结构

一个RDD具有以下固定的数据结构

总结来说,一个RDD的关键信息无非是,定义了数据来源,数据分布存储的情况,以及准备执行的计算逻辑。通过这些新,我们可以构建一个图,图的两个vertex分别是RDD,edge为computation

  private[spark] def conf = sc.conf
  // =======================================================================
  // Methods that should be implemented by subclasses of RDD
  // =======================================================================

  /**
   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition.
   */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]//当前RDD需要执行的计算

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  protected def getPartitions: Array[Partition]//当前RDD对应的分区

  /**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps//当前RDD依赖的父亲数据集

  /**
   * Optionally overridden by subclasses to specify placement preferences.
   */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None

  // =======================================================================
  // Methods and fields available on all RDDs
  // =======================================================================

  /** The SparkContext that created this RDD. */
  def sparkContext: SparkContext = sc

  /** A unique ID for this RDD (within its SparkContext). */
  val id: Int = sc.newRddId()

  /** A friendly name for this RDD */
  @transient var name: String = null //当前RDD的名称

3.2 RDD 特点

四、Spark的计算流程

1542623572371.png

五、RDD Transformation

将RDD进行一系列变换,生成新的RDD的过程,叫做Transformation。所有那些可以就地计算,而不需要数据迁移的transformation叫做Narrow Transformation。

5.1 transformation大概源码

以map操作为例

  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    //将传进来的函数f进行clean,这里先不深究,只需要知道clean后的函数,跟原函数功能相同
    
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    //这里返回MapPartitionsRDD对象,其构造参数为当前RDD和一个将f应用于迭代器的函数定义
  }

5.2 flatMap

map操作是将迭代RDD中的每个元素,然后将其做一定加工,返回的的依然是一个元素。而flapMap接受的函数参数的入参是RDD中的每个元素,但对该元素处理后,返回的是一个集合,而不是一个元素。flatMap源码如下:

  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

总结来说,map和flapMap的异同点如下:
- map接受的函数参数签名是:(f: T => U)而flatMap接受的函数参数签名为:(f: T => TraversableOnce[U]),可以看到返回的是集合

5.2 Narrow Transformations

Narrow Transformation操作有


1542189363924.png

5.3 Wide Transformations

有些计算,需要依赖其他节点数据,这种计算会导致数据移动,成为Wide Transformations。比如,基于某个key分类的操作GroupByKey,这个Key可能散落在不同的work node上,为了进行GroupByKey计算,需要计算节点间进行数据移动,比如将某个Key对应的数据,统一移动到一个节点上。Wide Transformation操作有如下:


1542189625738.png

六、RDD Action

所有Tranformation操作,都不会真正执行,直到Action操作被调用,Action操作返回是具体值,而不是RDD。这种特性成为Lazy Computing.
Action操作触发后,会将执行结果发给Driver 或者写如到外部存储。以下操作属于Action操作:
First(), take(), reduce(), collect(), count()

6.1 关键action源码

所有action操作,最终都会调用SparkContext的runJob方法。runJob有需多重载方法,以其中一个为例

def runJob[T, U: ClassTag](
      rdd: RDD[T],//需要处理的RDD数据
      processPartition: Iterator[T] => U,//需要在每个数据分区上进行的操作
      resultHandler: (Int, U) => Unit)//如何将上述每个分区处理后的结果进行处理

可以看到runJob中体现了所有分布式计算理论架构,即MapReduce。其中processPartition定义每个分区要需要做的map操作,这一步将减少数据量,将map操作的结果做为输入,传进reduce操作,进行汇总处理。

6.2 aggregate

  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())//1
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)//2
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)//3
    sc.runJob(this, aggregatePartition, mergeResult)//4
    jobResult//5
  }
  1. 定义一个结果汇总变量,它将存储aggregate方法最终的返回结果,初始值为zeroValue
  2. 在每个RDD数据分区上,使用迭代器,应用aggregate方法,初始值为都为zeroValue
  3. 对每个分区的结果,使用combOp方法进行汇总计算。输入index为分区的编号,taskResult为上一步每个分区计算后的结果,同汇总变量jobResult再来进行combOp计算。从第一步可知,jobResult的初始值为zeroValue
  4. 将上述两个函数作为入参,传递给sc.runJob方法,在spark集群进行执行
  5. 返回结果

举例:

    val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
    val result = inputrdd.aggregate(3)(
      (acc, value) => {
        println(acc+":"+value)
        (acc + value._2)
      },
      (acc1, acc2) => (acc1 * acc2)
    )
    println(result)//输出4032

解释:

6.3 fold

fold函数同aggregate类似,同样是调用SparkContext的runJob函数,只不过fold只接受一个值参数,和一个函数参数,其内部在调用runJob时,分区计算和结果计算都使用同样的函数。源码如下:

  def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanOp = sc.clean(op)
    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
    sc.runJob(this, foldPartition, mergeResult)
    jobResult
  }

举例:

val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)//1
val result = inputrdd.fold(("test",3))(
  (acc, ele) => {
    println(acc+":"+ele)
    ("result",acc._2 + ele._2)
  }
)
println(result)//输出:(result,83)

假设注释1中切分的2个分区为("maths", 21)和("english", 22),("science", 31),那么执行过程如下:

  1. 3+21=24
  2. 3+22+31=56
  3. 3+24+56=83

6.4 reduce

reduce同样调用了SparkContext的runJob函数,但reduce接收的参数在fold上进一步简化,少了zeroValue参数,只接收一个函数参数即可。同样该参数,在调用runJob时,即作为分区收敛的函数,记作为分区汇总计算的函数

  def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

举例:

    val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
    val result = inputrdd.reduce(
      (acc, ele) => {
        println(acc+":"+ele)
        ("result",acc._2 + ele._2)
      }
    )
    println(result)//结果为:(result,74)

6.5 collect&top

collect和top方法都会将数据收集到driver本地,前者是收集全部,后者是收集指定条数。所以最好知道收集的数据集较小时使用。否则会有很大的性能问题,比如大数量的传输,以及driver本地的内存压力

6.6 reduce和reduceByKey

前者是action操作,后者是transformation操作

七、RDD cache优化

RDD的数据,来至于外部存储介质,比如磁盘。而每一次用该RDD,都要去磁盘加载,这有时间和性能上的损耗。可以使用rdd的cahce方法,将该RDD缓存到内存,这样后续重复使用该RDD时,直接去内存拿。
cache的几个级别

7.1 stage

按数据是否在分区间迁移,来划分stage。一个stage有多个task,他们会并发的在不同的分区上执行相同的计算代码。比如紧邻的map和filter就会被划在同一个stage,因为他们可以并发在各分区上执行,而不需要数据移动。而reduceByKey则会单独成为一个stage,因为其涉及到数据移动

八、RDD lineage & DAG

RDD
从一个RDD转化成另一个RDD时,每一步都会记录上一个RDD关系。于是这形成一个血统谱系。具体

    val wordCount1 = sc.textFile("InputText").flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
    println(wordCount1.toDebugString)

最终输出:

(1) ShuffledRDD[4] at reduceByKey at SparkTest.scala:124 []
 +-(1) MapPartitionsRDD[3] at map at SparkTest.scala:124 []
    |  MapPartitionsRDD[2] at flatMap at SparkTest.scala:124 []
    |  InputText MapPartitionsRDD[1] at textFile at SparkTest.scala:124 []
    |  InputText HadoopRDD[0] at textFile at SparkTest.scala:124 []

可以看到结果以倒序的方式输出,有点像java异常时,打出的依赖栈。从最近的依赖点,一直回溯

九、DataFrame

在RDD上进一步封装的数据结构。这种数据结构可以使用SparkSql去操作处理数据,这降低了对分布式数据集的使用难度。因为你只要会sql,就可以进行一些处理

十、GraphX

十一、 如何调优

一个Spark应用最会对应多个JVM进程。分布式driver,以及该应用在每个worknode上起的JVM进程,由于driver担任的协调者角色,实际执行是worknode上的EXECUTOR,所以对于JVM的调优,主要指对Executor的调优。这些JVM进程彼此会通信,比如数据shuffle。所以优化Spark应用的思路主要从以下个方面入手:

11.1 序列化

通过sparkConf conf.set(“spark.serializer”, “org.apache.spark.serializer.KyroSerializer”)来配置,指定数据对象的序列化方式

十二、参考资料

https://data-flair.training/blogs/spark-tutorial/
https://spark.apache.org/docs/1.6.1/

上一篇下一篇

猜你喜欢

热点阅读