Spark Core
2019-01-15 本文已影响0人
436048bfc6a1
1.spark core
1.1 学习方法
对于关键类的关键方法一定看相应的注释
1.2 什么是RDD
A Resilient Distributed Dataset (RDD),
the basic abstraction in Spark.
弹性的分布式数据集(RDD),是spark中最基本的抽象
解释
1. 弹性指的是:
eg. RDDA =map=> RDDB =map=> RDDC
当RDDC的机器failure, 可以从RDDB计算出RDDC
所以弹性指的是从错误恢复的特性
represents an immutable, partitioned collection of elements
that can be operated on in parallel
代表一个不可变的,可以并行操作的分区的元素的集合
解释:
(1) 不可变:
A => map => B, 但是在这里A和B不一样
(2) 分区的集合
eg. List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
1, 2, 3 在第一台机器上
4, 5, 6 在第二台机器上
7, 8, 9, 10 在第三台机器上
This class contains the basic operations available on all RDDs
such as `map`, `filter`, and `persist`
class RDD 包含了在所有RDD上可用的基本操作
如map, filter和persist
In addition, org.apache.spark.rdd.PairRDDFunctions
contains operations available only on RDDs of key-value
pairs, such as `groupByKey` and `join`;
除此之外,org.apache.spark.rdd.PairRDDFunctions
包含了只在key-value形式的RDD上才可以使用的操作,
如groupby和join
解释:
(1) 如 select from * A join B on a.id = b.id;
如果在spark里执行类型这种语句,id就是key,其他则为value
Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs
(e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on
(e.g. block locations foran HDFS file)
内部上说,每个RDD特性都由以下五个主要性质决定的
- 有一组分区
- 有函数可以作用到每个分区上
- 有一组作用在其他RDDs上的依赖
- 可选,对于key-value RDDs有一个分区器
- 可选,有对于计算每个split都有一组喜好的位置
解释
- 一个RDD是由分区构成的
- 对RDD所使用的函数其实是作用到其所有分区上
eg .rdd.map() <==>rdd.split1.map() + rdd.split2.map()
在spark里partition <==> task, 有几个partition就有几个task
- eg. RDDA =map=> RDDB =map=> RDDC
RDDC依赖于RDDB和RDDA
- 对于一个有三副本的文件,其存在三台机器上
file1: hadoop001 => replica1
hadoop002 => replica2
hadoop0003 => replica3
最好的是将计算放在这三个机器上,但是有时由于三台机器繁忙,没有资源进行计算,此时有两种选择,拷贝数据或者等待
但是最好选择等待,以为拷贝数据花费时间更多,消耗资源更大,所以
移动计算 > 移动数据
1.3 源码解释
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Loggin
解释
1. RDD是抽象类,需要实现
2. 继承了序列化,因为要进行网络传输
3. Serializable 和 Loggin 都是trait,正常写法是 with Serializable with Loggin
但是由于scala特性,第一个trait可以写extends,不用写with
下面是RDD的其中一个实现类HadoopRDD
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
extends RDD[(K, V)](sc, Nil) with Logging
1.3.1 源码中体现RDD的五大特性
Implemented by subclasses to return the set of partitions in this RDD. This method will only
be called once, so it is safe to implement a time-consuming computation in it.
The partitions in this array must satisfy the following property:
rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }
protected def getPartitions: Array[Partition]
对应RDD的第一个特性
Implemented by subclasses to compute a given partition.
def compute(split: Partition, context: TaskContext): Iterator[T]
对应的是RDD第二个特性
Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
be called once, so it is safe to implement a time-consuming computation in it.
//Seq[Dependency[_]代表集合,指多个Dependencies
protected def getDependencies: Seq[Dependency[_]] = deps
对应RDD第三个特性
1.3.2 在RDD的实现类是如何体现的(HadoopRDD)
//返回分区数组
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
try {
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
} catch {
case e: InvalidInputException if ignoreMissingFiles =>
logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
s" partitions returned from this path.", e)
Array.empty[Partition]
}
}
官网原文1
Spark revolves around the concept of a resilient distributed dataset (RDD),
which is a fault-tolerant collection of elements that can be operated on in parallel.
There are two ways to create RDDs: parallelizing an existing collection in your driver program,
or referencing a dataset in an external storage system,
such as a shared filesystem, HDFS, HBase,
or any data source offering a Hadoop InputFormat.
翻译1
RDD的概念是spark重要的元素。
RDD就是一个可以并行操作的容错的集合。
有两种方式可以创建RDD
在driver program中并行化一个现有的collection
在外部存储系统(如HDFS、HBASE或者其它能提供InputFormat的数据源)refer一个dataset
解释1
第一种在测试时候使用
源代码1
def parallelize[T: ClassTag](
//seq可以理解为数组
seq: Seq[T],
//numSlices有默认值,默认值为defaultParallelism
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
源代码1注释
Methods for creating RDDs
Distribute a local Scala collection to form an RDD
创建RDD的方法
为了生成RDD,提供本地的scala集合
源代码2
//ParallelCollectionRDD继承与RDD
//说明ParallelCollectionRDD也是RDD
private[spark] class ParallelCollectionRDD[T: ClassTag](
sc: SparkContext,
@transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {......}
源代码注释2
Return an array that contains all of the elements in this RDD
返回一个包含在此RDD的所有元素的array
源代码3
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
源代码注释3
Read a text file from HDFS, a local file system (available on all nodes),
or any Hadoop-supported file system URI, and return it as an RDD of Strings.
从HDFS或者本地文件系统(在所有节点上都可用)
注: 也就是说要保证所有节点上相同路径相同的数据
源代码4
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
代码1
function main() {
if $cygwin; then
# Workaround for issue involving JLine and Cygwin
# (see http://sourceforge.net/p/jline/bugs/40/).
# If you're using the Mintty terminal emulator in Cygwin, may need to set the
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
fi
}
代码解释1
#在此设置默认参数
#调用spark-submit函数
${SPARK_HOME}"/bin/spark-submit
--class org.apache.spark.repl.Main
--name "Spark shell"
"$@"
example1
complete_jobs.png
上图中,显示调用一次collect,完成1个job
注意1
如果spark是一个有三台机器组成的集群,集群的机器名分别为
HDFS1
HDFS2
HDFS3
如果使用textFile()读取文件,必须要保证该文件在三台机器上都存在
否则的话会报org.apache.hadoop.mapred.InvalidInputException: Input path does not exist错误
官网原文及翻译1
If using a path on the local filesystem,
the file must also be accessible at the same path on worker nodes.
Either copy the file to all workers or use a network-mounted shared file system.
如果使用本地文件系统,该文件必须能在其他的节点上的相同路径上能被访问
要么将文件拷贝到所有的工作节点上
要么使用网络共享文件系统(如HDFS)
All of Spark’s file-based input methods, including textFile,
support running on directories, compressed files, and wildcards as well.
For example, you can use textFile("/my/directory"),
textFile("/my/directory/*.txt"),
and textFile("/my/directory/*.gz").
所有spark的基于文件的input方法,包括textFile()方法,
支持读取目录,压缩文件并且支持正则表达式
The textFile method also takes an optional second argument
for controlling the number of partitions of the file.
By default,
Spark creates one partition for each block of the file
(blocks being 128MB by default in HDFS),
but you can also ask for a higher number of partitions by passing a larger value.
Note that you cannot have fewer partitions than blocks.
textFile方法可以接受第二个可选的参数
该参数能够控制文件分区的数量,即minPartitions
默认情况下,Spark能够为文件的每个block创建一个分区
(在HDFS上,一个block是128M)
但是可以通过传递一个更大的值来要求更多的分区数量
也就是说并行度提高了,同一时刻处理数据的job多了,
在资源足够情况下,性能也就越高
不能让分区数比block小
但是分区数可以比block小,但是会造成每一个task处理的数据就更多了,
因为会发生一个文件会被拆分成很大的+很小的
!!!暂时不知道会造成什么问题,以后解决
(默认情况下,eg.一个有10个block的文件可以被拆分为10个分区
如果分区数比block小,partition1是整个block1+一小部分block2
一个task运行一个partition,此时partition就会比原来的大
)
Apart from text files,
Spark’s Scala API also supports several other data formats:
除了text file,spark也支持其他的数据类型
SparkContext.wholeTextFiles lets you read a directory containing multiple small text files,
and returns each of them as (filename, content) pairs.
This is in contrast with textFile,
which would return one record per line in each file.
Partitioning is determined by data locality which,
in some cases, may result in too few partitions.
For those cases, wholeTextFiles provides an optional second argument
for controlling the minimal number of partitions.
SparkContext.wholeTextFiles让你读一个包含多个小的text文件的目录
已(filename, content)形式返回一个key-value对。
这个是与textFiles进行对比,它的一个record就是任意文件的一行
分区是由数据位置决定的,可能会造成分区过小
所以,wholeTextFiles 提供另一个参数来控制最小分区
For [SequenceFiles], use SparkContext’s `sequenceFile[K, V]` method
where `K` and `V` are the types of key and values in the file
对于sequenceFile,使用SparkContext的sequenceFile[K, V]方法
其中k是key, v是value
For other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method
对于其他Hadoop的InputFormats,
可以使用SparkContext.hadoopRDD 方法
RDD.saveAsObjectFile and SparkContext.objectFile support
saving an RDD in a simple format consisting of serialized Java objects
RDD.saveAsObjectFile和SparkContext.objectFile
支持
已由连续的java object所组成的简单形式来保存RDD
源代码5
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
assertNotStopped()
val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
classOf[Text],
classOf[Text],
updateConf,
minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
}
源代码注释4
Read a directory of text files from HDFS,
a local file system (available on all nodes),
or any Hadoop-supported file system URI.
Each file is read as a single record and returned in a
key-value pair,
where the key is the path of each file, the value is the content of each file.
读取一个HDFS(或本地文件系统或任意Hadoop支持的文件系统的URI)上的text文件的目录
每一个文件当读取时都会被当成单个记录
返回是一个key-value对
key就是每个文件的路径,value就是每个文件的内容
Small files are preferred, large file is also allowable, but may cause bad performance
小文件适合用(但是不常用),大文件也会被允许使用,但是会造成性能上的损失
官网原文及翻译2
RDDs support two types of operations: transformations,
which create a new dataset from an existing one,
and actions,
which return a value to the driver program after running a computation on the dataset.
For example, map is a transformation that passes each dataset element
through a function and returns a new RDD representing the results.
On the other hand, reduce is an action that aggregates all the elements of the RDD
using some function and returns the final result to the driver program
(although there is also a parallel reduceByKey that returns a distributed dataset).
RDD支持两种类型的操作:
transformations
从已有的数据集创建新的数据集
也就是说
RDDA ==transformation==>RDDB
transformation包括map、filter等
RDD在代码层面上来看就是一个类
actions
在经过在数据集上计算之后,返回给driver program一个值
例如: map是一个transformation, 通过一个函数来讲数据集的每个元素传入
返回一个代表了结果集的新的RDD
在Spark上的transformation是lazy的
也就是说当在RDD上进行transformation, 是不启动task
因为对于RDD的transformation,是pipeline操作的
这也符合RDD特性之一: dependency
所以需要等待transformation操作都输入后, 才启动task对其进行pipeline操作
但是当执行action操作,就必须立刻启动task执行操作
所以此时,启动一个task,进行transformation,然后进行action操作
他们不会直接计算结果,相反,他们会记住应用在base dataset上的transformation
transformation只有当action需要结果时才会计算
并且将该结果返回给driver program
此设计可以使spark高效的运行
例如: datasets经过map之后的数据集用于reduce action上
只有reduce的结果返回给driver program,而不是将mapped 数据集返回给driver program
源代码6
def sortBy[K](
f: (T) => K,
//默认是升序
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}
术语解释1(重要)
Application:
User program built on Spark
Consists of a driver program and executors on the cluster
应用:
构建在spark上的用户应用程序(eg.idea上的scala object)
由在集群上的一个driver program和多个executors所组成
Application jar
A jar containing the user's Spark application
In some cases users will want to create an "uber jar"--
--containing their application along with its dependencies
The user's jar should never include Hadoop or Spark libraries, --
--however, these will be added at runtime.
Application jar
一个jar包含了用户的Spark应用程序
Driver program
The process running the main() function of the application--
--and creating the SparkContext
Driver program
运行应用main()方法的进程,并且能创建SparkContext
所以在main方法里创建SparkContext的程序就是driver program
Cluster manager
An external service for acquiring resources on the cluster
(e.g. standalone manager, Mesos, YARN)
在集群上申请资源的外部的服务
好处: 代码开发过程中不用关注代码运行在哪里
运行各种模式下,其代码都是相同的
Deploy mode
Distinguishes where the driver process runs.
In "cluster" mode, the framework launches the driver inside of the cluster
In "client" mode, the submitter launches the driver outside of the cluster
Deploy mode
分辨driver process运行在哪里
在集群模式, 框架在集群内启动框架
在client模式, submitter在cluster外面启动driver
Worker node
Any node that can run application code in the cluster
Worker node
在集群上能够运行应用的node被称为Worker node
Executor
A process launched for an application on a worker node
that runs tasks and keeps data in memory or disk storage across them
Each application has its own executors
Executor
启动一个服务于worker node(eg. node manager)上的进程, 运行在container里
运行tasks(map或filter),并且可以将数据放于内存中或者是跨节点的磁盘上。
每个应用程序有其独立的executors
Task
A unit of work that will be sent to one executor
Task
发送给executor的工作单元
Job
A parallel computation consisting of multiple tasks
that gets spawned in response to a Spark action (e.g. save, collect)
Job
由多个task组成的一个并行计算
一个action触发一个job
Stage
Each job gets divided into smaller sets of tasks
called stages that depend on each other
similar to the map and reduce stages in MapReduce
Stage
1个job会被分成多个stage
遇到shuffle就产生新的stage
figure1
spark开发流程.png