Spark常用概念
从Spark官网以及《Spark最佳实践》中整理得到,其中主要是给自己做记录笔记用,难免因为初学的原因有很多过时或者错误的信息。
Spark部署中常用名词
Application
这是用户在Spark上创建的。由一个Driver程序和一系列在集群上的executors组成。
Application Jar
这个是一个包含用户的Spark App的jar。有时候用户需要创建一个"uber jar"来包含他们的应用所需的依赖。不过用户的jar不应该包含Hadoop或者Spark的lib,因为这些会在运行的时候加入。
Driver Program
这是一个运行App的main()函数的进程,并创建SparkContext
Cluster Manager
这是一个外部的服务,用来在集群上获取资源。比如standalone manager,Mesos,YARN
Deploy mode
区分Driver程序运行的方式。"cluster"模式下framework在集群内启动driver,"client"模式下由submitter在集群外启动driver。
Work Node
在集群中运行App代码的一个物理机器。仅在测试的时候出现一台物理机器上启动多个节点。
Executor
在worker节点中为App工作的进程,负责运行tasks并维护内存和硬盘中的数据。每一个App在节点上都有一个自己的Executor。与App有相同的生命周期,当然一个节点上可能运行多个App,那么也就对应多个Executor。
Task
发送给一个Executor的最小工作内容单位。
Job
这是一个并行计算由多个tasks组成,用来响应Spark的action操作。比如save、collect等。在Driver的log中你可以看到这个。(一次RDD action对应一个Job,会提交到集群资源管理器调度执行)
Stage
每一个Job被分割为多个task的小集合被称为Stage,并且相互依赖(类似于MapReduce中map和reduce的stages)。同样也可以在Driver的log中看到。(Stage介于Job和Task之间,是按Shuffle分割的task集合。当然如果没有Shuffle操作,对于Action也会生成一个默认Stage)
总结:
用户写好Application及其依赖的Application Jar后,Driver程序负责加载并产生SparkContext,并根据Deploy的模式不同和Deploy的manager建立联系,取得集群的资源,也就是在各个Worker节点上创建一个专属于某个App的一个Executor。然后对RDD的每一次action操作会产生一个Job,这个Job会根据Shuffle的需要分成多个Stage并最终分成一个个的task发给Executor来执行。
RDD介绍
RDD特点
1、RDD是一个数据集
2、RDD是分布存储的(被水平切割成小的数据块,分散在集群的多个节点上。便于并行计算。)
3、RDD的分布是弹性的。(I,故障导致需要转换到其他节点;II,有些操作需要汇聚并重新分布,比如groupBy)
4、RDD是只读的,一旦生成就不能修改。
5、RDD可以指定缓存在内存中。可以让被依赖的RDD放入内存中防止被回收,避免反复计算。
6、RDD可以通过重新计算得到。可靠性不是靠复制,而是通过记录足够的计算过程,需要的时候可以从头或者镜像中计算得到来恢复。
RDD 对象的核心属性
下文参考的是1.4.1版本的Spark。有点老,只能当个大概看看,最新版还是要查看源码:RDD.scala
1、一个分区列表(var partitions_ : Seq[ Dependency[ _ ] ])
每个分区里是RDD的部分数据。
2、一个依赖列表(var dependencies_ : Array[Partition])
存储依赖的其他RDD
3、一个名为compute的计算函数(def compute(split: Partition, context: TaskContext): Iterator[T])
用于计算RDD的各个分区的值
4、分区器(可选,对应l连不连续分区)(val partitioner: Option[Partitioner])
用于键/值类型的RDD,比如某个RDD是按散列来分区的。
5、计算各分区时优先的位置列表(可选)(def getPreferredLocations(split: Partition): Seq[String])
比如从HDFS上的文件生成RDD时,RDD分区的位置优先选择数据所在的节点,这样可以避免数据移动带来的开销。
RDD的Transformation
RDD的Transformation是指由一个RDD生成新的RDD的过程。记住,所以的RDD Transformation都只是生成了RDD之间的计算关系以及计算方法,并没有真正的计算。
RDD的多个Transformation操作会生成一个DAG依赖关系链表。理解了RDD的依赖关系也就理解了RDD的重建容错机制。
Spark RDD的依赖关系有两种类型:
窄依赖。依赖上级RDD的部分分区
Shuffle依赖。依赖上级RDD的全部分区
另外如果依赖的链条太长,那么通过计算来恢复的代价就太大了。所以Spark又提供了一种检查点机制。对于依赖链条太长的计算,对中间结果存一份快照,这样就不需要从头开始计算了。比如Spark Streaming流式计算。
RDD的Action
Transformation是从一个RDD生成新的RDD,而Action代表计算结束,一次调用Action之后不再产生新的RDD,结果直接返回到Driver程序中。
所以Transformation的代码执行中不能加入Action操作。不能rdd.map(x => rdd2.values.count() * x)这样在map中加入action操作。
Transformation只是建立关系,Action操作才是真的实际执行者。每个Action都会调用SparkContext的runJob方法向集群正式提交请求,所以每个Action都对应一个Job。最后Job在集群的执行结果会返回给Driver。
RDD的Shuffle
Shuffle的概念来自于Hadoop的MapReduce计算过程。比如对<Key, Value>类型的RDD操作,当涉及按照Key对RDD成员重组时,因为具有相同Key的成员分布在不同的节点上,所以需要现将这些成员都汇集到一个节点上,然后再对他们的Value进行操作。这个重组过程就是Shuffle操作。
Shuffle是一个非常耗资源的操作,会涉及大量的网络IO,内存的使用,而且会产生大量的临时文件在磁盘上。所以分Stage与之对应,每个Stage包含许多Task。
SparkContext
SparkContext是Spark程序最主要的入口,用于与Spark集群连接。与Spark集群的所有操作都通过SparkContext来进行。包括创建RDD、计算器和其他广播变量等。
所有的Spark Application都必须创建一个SparkContext对象。进行流式计算时使用的StreamingContext,以及进行SQL计算时使用的SQLContext,也会关联一个现有的SparkContext或者隐式创建一个。
源码文件:SparkContext.scala
SparkConf配置
初始化SparkContext时,只需要一个SparkConf配置对象作为参数即可。
主要的配置参数:
master:集群地址
appName:程序名,会显示在集群的Web UI上
sparkHome:Spark在机器上的安装目录
jars:给集群添加额外的JAR文件集合,可以是本地目录或者网络URL
environment:环境变量
其中SparkConf类的主要成员是一个散列表,其中Key和Value的类型都是String。
private val setting = new ConcurrentHashMap[String, String] ()
实际使用SparkConf.set("spark.master", strValue)等,或者setMaster(strValue)等
这里再额外讲下配置SparkContext的常见三种方式:
1、在配置文件中配置,"$SPARK_HOME/conf/spark-defaults.conf"
2、在命令行中--master之类的配置
3、在代码中对SparkConf配置
这三种配置都可以达到一样的目的,只不过有优先级顺序,从上倒下优先级越来越高。高优先级的配置内容会覆盖低优先级的配置。
而且每个JVM只允许启动一个SparkContext,如果要再启动一个那么需要把之前一个停止:sc.stop()或者设置spark.driver.allowMultipleContexts为true来阻止抛出异常。
初始化过程
除了初始化各类配置、日志之外,最重要的初始化操作之一是启动Task调度器和DAG调度器。
DAG调度和Task调度的区别是,DAG是最高级别的调度,为每个Job绘制出一个有向无环图,跟踪各Stage的输出,计算完成Job的最短路径,并将Task提交给Task调度器来执行。而Task调度器只负责接受DAG调度器的请求,负责Task的实际调度执行,所以DAGSchedule的初始化必须在Task调度器之后。
DAG与Task这种分离设计的好处是,Spark可以灵活的设计DAG调度,同时还能与其他的资源调度系统结合,比如YARN、Mesos。
其他功能
创建RDD:parallelize和textFile newAPIHadoopFile
RDD持久化:persistRDD、unpersistRDD
创建共享变量:包括计数器和广播变量
stop()
runJob