RDD
版本: 2.3.0
原文链接: http://spark.apache.org/docs/latest/rdd-programming-guide.html
Overview
在上层来看,每个spark应用程序都包含一个driver 用于运行用户的main函数和在集群上执行并行操作 。 RDD(Resilient distributed dataset)弹性数据集是spark的主要抽象,它是分布在集群上的可以并行操作的元素的数据集。 RDD可以在hadoop上文件上创建,或者dirver上已经存在的scala集合创建 和转换。 RDD驻留在内存中。 RDD可以自动从节点失败中恢复。
共享变量是spark的第二个重要概念,它可以在并行操作中使用 。 默认情况下, spark以在不同的节点上运行task来实现并行计算, 每个变量的副本都会送达每个task上。 有时,需要在task之间共享变量,或在task和driver之间共享变量。spark支持两种类型的共享变量: 广播变量,用于在所有的节点上内存中缓存值; 累加器变量(accumulators),用于累计或统计,如count 、sum。
连接spark
scala
spark 2.3.0 版本默认需要scala 2.11 。
要写saprk应用,需要maven 依赖:
groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.3.0
另外,如果需要访问HDFS集群,需要依赖hadoop-client :
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最后,导入spark相关类:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
spark应用程序首先要创建 SparkContext对象 ,用于告诉应用如何访问集群。 要创建SparkContex ,需要先构建SparkConf对象 。 每个JVM只能有一个SparkContext 。 如果已有,则在创建前需要关闭stop() 。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
其中,appName是应用程序的名称,会在集群UI上显示。 master是个url用于连接集群。 实践中,不会写死master 。
各种模式下的master url : Spark, Mesos or YARN cluster URL
在spark shell中,SparkContext已经创建好,名称叫sc 。 使用自建的SparkContext可能不能工作。
--master : 指定连接参数 ;
--jars :逗号分隔列表,用于向classpath添加jar包;
--packages: 添加maven坐标指定的依赖包到classpath ;
--repositores : 可能会包含依赖包的附加仓库。
$ ./bin/spark-shell --master local[4]
Or, to also add code.jar to its classpath, use:
$ ./bin/spark-shell --master local[4] --jars code.jar
To include a dependency using Maven coordinates:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
更详细的信息请使用帮助, spark-shell --help 。
java
spark2.3 版本支持lambda表达式 。
从2.2版本起不支持java 7 。
maven 依赖:
groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.3.0
若访问hdfs ,则还要添加如下:
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
需要import的类:
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
在使用时,首先也要创建 JavaSparkContext对象,创建时需要使用sparkconf类配置相关信息:
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);
appName 和 master参数解释同scala 。
python
spark2.3版本要求的python版本为 python2.7+ 或 3.4+ 或PyPy2.3+ 。
可以使用标准的CPython解释器,这样可以使用NumPy。
从spark2.2开始, 不支持Python2.6。
Python写的spark应用可以使用spark-submit提交 或 在setup.py中包含如下:
install_requires=[
'pyspark=={site.SPARK_VERSION}'
]
当没有使用pip install PySpark时, 使用 spark-submit提交应用。 该脚本会加载spark的java/scala库并允许你提交应用到集群中。 也可以使用pyspark脚本启动交互式窗口。
若需要方法HDFS数据, 你需要使用当前的hdfs版本构建PySpark。
编程时你需要导入一些module:
from pyspark import SparkContext, SparkConf
PySpark要求在driver和worker端的python版本一致。 他使用PATH中设置的默认的python, 可以通过如下参数修改之:
$ PYSPARK_PYTHON=python3.4 bin/pyspark
$ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py
python 下 SparkContext对象的创建:
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
appName 和 master的解释同scala。
在pyspark shell中 , 已经创建好了SparkContext ,名称为sc 。 不能自建。 --master参数同上;
--py-files: 指定传给python 运行时的逗号分隔的 zip或py文件列表。
--packages: 逗号分隔的,maven坐标指定的依赖。
--repositories: 附加的可能存在依赖包的仓库。
任何spark包依赖的python模块(在requirements.txt中列出的)需要手工使用pip安装。
$ ./bin/pyspark --master local[4]
Or, to also add code.py to the search path (in order to later be able to import code), use:
$ ./bin/pyspark --master local[4] --py-files code.py
pyspark的完整选项列表, 使用 --help 。
也可以在IPython中启动pyspark 。 pyspark要的版本1.0.0+ 。 要使用IPython , 需要设置如下参数:
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
要使用 Jupyter notebook :
$ PYSPARK_DRIVER_PYTHON=jupyter
$ PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
你可以通过PYSPARK_DRIVER_PYTHON_OPTS参数 定做ipython 或 jupyter 命令。
弹性数据集 RDD
可并行操作, 容错 。
两种方式创建RDD : parallelizing dirver中已有的数据集; 引用外部存储中的已有数据集 (如共享文件系统、HDFS、HBase、或提供hadoop InputFormat的任何数据源)。
Parallelized Collections
(对数据集进行分区,一遍可以并行计算,分区方法称为sc的 parallelize方法)
scala
从drvier应用中的已经存在的数据集来创建Parallelized collection ,这些方法被称为SparkContex的parallelize方法 。 示例:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
创建后,分布式数据集(distData)可以被并行操作。 例如,我们可以调用 distData.reduce((a, b) => a + b) 来处理array中的各元素相加。
一个非常重要的参数partitions 是指并行操作数据集的并行度。 spark会为每个partition启动一个task 。 典型一个cpu分2-4个partition 。 spark会根据集群配置自动进行partition 。 但是也可以手工指定,如parallelize (e.g. sc.parallelize(data, 10))
. (第二个参数指定并行数量)。
java
由JavaSparkcontext的Parallelize 方法创建 。
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
python
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
External Datasets
scala
spark可以从文件创建分布式datasets ,文件可以是本地文件、hdfs、 cassandra、hbase、 Amazon s3 等。 支持 text 文件、 SequenceFiles 和其他任何Hadoop InputFormat 。
文本文件RDD可以使用 sc的textFile方法创建。 该方法接收文件的URI( 可以是本地文件路径,或 hdfs:// 或 s3a:// 等) , 读取为按行分割的集合。 例如:
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
一旦创建, distFile即可使用dtaset操作。 例如,可以使用map或reduce操作来计算长度 : distFile.map(s => s.length).reduce((a, b) => a + b).
注意:
- 当使用本地文件时, 该文件必须在各个worker节点都存在,并且路径名也要相同。
- 所有的spark的文件输入方法,包括textFile, 支持文件目录、压缩文件和通配符。 例如: textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").
- textFile方法支持第二个参数, 来指定并行分区数量。 默认spark为每个block创建一个分区, 可以通过传入一个大值,来提供分区数量。 注意,分区数量不能少于blocks数量。
scala api 支持其他的数据格式:
- SparkContext.wholeTextFiles 支持读取包含多个小的text 文件的目录, 返回 (filename, content)对。 对比textFile方法,返回的是每条记录一行。 分区数量由数据局部性决定,在某些情况下可能导致分区太少。对于此情况,wholeTextFile提供第二个参数来控制最小分区数量。
- 使用SparkContext’s sequenceFile[K, V] 方法来处理sequenceFile , k 和v 是文件中的key和value的类型。 这应该是Hadoop Writable接口的子类,如 IntWritable 和 Text 。 另外,spark允许你指定本地类型, 如sequenceFile[Int, String] 将自动读取 IntWritables and Texts.
- 对于其他hadoop InputFormats , 可以使用 SparkContext.hadoopRDD 方法, 该方法接收一个 JobConf 和 input 格式类、key 类 和 value 类。 也可以使用 SparkContext.newAPIHadoopRDD方法,来处理基于“new” MapReduce API (org.apache.hadoop.mapreduce) 的InputFormat 。
- RDD.saveAsObjectFile and SparkContext.objectFile 支持以简单格式来保存RDD, 由序列号的Java对象组成。 虽然这不像Avro格式有效的保存RDD, 但是他提供了一个简单方式。
java
使用类似textFile 来创建之
JavaRDD<String> distFile = sc.textFile("data.txt");
其他内容同scala。
python
>>> distFile = sc.textFile("data.txt")
其他内容同sacla。
RDD.saveAsPickleFile and SparkContext.pickleFile 支持保存RDD ,以简单格式包含序列化 pickle python对象。 批量序列化,默认值10.
注意: 该组件当前被标记为 Experimental ,为高级用户提供。 将来将被替换为支持sparksql 。
writable support
pyspark SequenceFile 支持加载java key-value RDD , 转换Writables为java类型,使用Pyrolite序列化Java 对象 。 当保存RDD key-value 到SequenceFile , pyspark再反向处理。 将python对象转换为java读写,然后转换Writables。 下表列出会自动转换内容:
Array类型不会被处理。 用户需要自定义ArrayWritable来支持之。
保存及加载SequenceFile
同text file一样, SequenceFile在加载和保存时可以指定路径。 key vlaue 类需要指定,但是对于标准的Writables 不需要:
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
保存和加载其他 hadoop Input/Output 格式
pysaprk可以读写其他任何的hadoop 输入出格式。 如果需要, hadoop配置可以以python 字典的形式传递, 如下实例使用Elasticsearch ESInputFormat:
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
"org.apache.hadoop.io.NullWritable",
"org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf)
>>> rdd.first() # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
{u'field1': True,
u'field2': u'Some Text',
u'field3': 12345})
注意:
请注意,如果InputFormat仅依赖于Hadoop配置和/或输入路径,并且可以根据上表轻松转换键和值类,那么此方法应该适用于这种情况。
如果你有自定义的序列化二进制数据(比如从Cassandra / HBase加载数据),那么你首先需要将Scala / Java端的数据转换为Pyrolite pickler可以处理的数据。 为此提供了转换器特性。 只需扩展此特征并在转换方法中实现您的转换代码即可。 请记住确保此类以及访问InputFormat所需的任何依赖项都打包到Spark作业jar中并包含在PySpark类路径中。
有关使用Cassandra / HBase InputFormat和OutputFormat以及自定义转换器的示例,请参阅 Python examples and Converter examples 。
RDD 操作
RDD 支持两种操作, transformations: 用于从已经存在的dataset生成新的; actions: 将计算结果返回driver 。
例如: map 是个 transformations , 传送每个dataset的元素到一个function ,然后以一个新RDD的形式表示结果 。 reduce 是个action ,使用一些function来处理所有的rDD元素, 返回最终结果到driver (也有通过reducebyKey返回并行计算结果)。
spark中的所有的transformations 都是延迟加载的, 他们不会立马计算结果。 只有当遇到action时才会计算结果 。 该设计使spark运行的更高效 。 例如: 一个由map生成的dataset ,后面将通过reduce返回结果给driver , 不会将中间结果返回给driver 。
Basics
python
要说明RDD 基础操作, 看下面实例:
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
第一行从外部文件生成RDD 。 数据不会加载到内存中, lines只是指向了该文件。
第二行定义了lineLengths 作为map转换后的结果 。 lineLengths不会立即开始计算。
最后一行, 运行reduce , 是个action 。 此时spark将计算转换为task 可以运行在不同的机器上, 每台机器上运行各自的map和reduce ,返回给driver应用程序。
如果我们想在后续继续使用lineLengths , 我可以在reduce之前加上:
lineLengths.persist()
这会使lineLengths在第一次计算完成后保存到memory中。
scala
示例如下:
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
三行的解释如上。
若想在后续继续使用lineLengths , 在reduce之前加上 lineLengths.persist() , 会将其在第一次计算完成后保存到内存中 。
java
示例如下:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
三行的解释同上。
若想在后续继续使用lineLengths , 在reduce之前加上lineLengths.persist(StorageLevel.MEMORY_ONLY()); , 会将其在第一次计算完成后保存到内存中 。
传递函数给spark
python
spark API 依赖driver传递过来的functions。 有如下三种建议方式:
- lambda 表达式 : 以表达式的形式表示的简单函数 。(lambda不支持多行表示或者返回值 )
- 对于长代码, 定义在传递给spark的代码中,以def形式。
- module中的 functions
例如,传递不能用lambda 表达式表示的长代码函数:
"""MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)
sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)
注意:可以传递方法的引用到一个类实例中,需要将包含对象的类一块发送到方法中, 如:
class MyClass(object):
def func(self, s):
return s
def doStuff(self, rdd):
return rdd.map(self.func)
当我们创建一个类Myclass 的实例,并调用doStuff实例时,里面调用的map需要函数func , 整个对象都要发送到集群中。
同样, 访问一个对象的域 需要引用整个对象 :
class MyClass(object):
def __init__(self):
self.field = "Hello"
def doStuff(self, rdd):
return rdd.map(lambda s: self.field + s)
要避免这个问题, 最简单的方式是将域拷贝到本地变量:
def doStuff(self, rdd):
field = self.field
return rdd.map(lambda s: field + s)
java
使用java 传递函数到集群, 这些函数需要实现接口org.apache.spark.api.java.function 。 创建这种函数有两种方式:
- 在自己的类中实现Function 接口, 作为匿名类或有名类,传递引用至spark 。
- 使用lambda表达式 。
例如:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
或者, 可以使用内联函数,但很笨拙:
class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
java的匿名内部类也可以访问封闭范围内的final标识的变量 。 spark将这些变量副本发送给每个节点(跟其他语言一样)。
scala
有两种方式传递scala函数:
- 匿名函数语法, 可用来写短片代码。
- 全局单例对象中的静态方法 , 例如可以定义对象MyFunctions ,并传递MyFunctions.func1 如下:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
也可以也可以向类实例中的方法传递引用,这要求发送的对象要包含该方法的类, 如:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
当我们创建MyClass实例 并调用doStuff方法, map 引用func1 方法, 因此整个对象都要发送到集群。 写法类似于 rdd.map(x => this.func1(x)).
同样,引用外部对象则需要发送这个对象 :
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
等价于 rdd.map(x => this.field + x) 。 要避免这个问题, 最简单的方式是将变量拷贝到本地 :
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
闭包
在集群上执行代码的变量和方法的scop和生命周期比较难理解。 RDD操作修改他们范围之外的变量经常会引发混乱。 下面的例子, 可以看到foreach()增加counter:
scala
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
java
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
python
counter = 0
rdd = sc.parallelize(data)
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print("Counter value: ", counter)
本地模式 vs 集群模式
上面的代码不会正常工作 。 为了执行job, spark将RDD操作分割成task ,交给exector进行执行 。 在开始计算之前, spark计算task中的闭包。 这些闭包在RDD执行计算期间必须可见。闭包被序列化并发送给各个executor 。
闭包中的变量发送到各个executor上的是当时的副本, 当在foreach中引用counter时, 它不是driver上的那个counter。 在driver节点上一直会有一个counter 变量,但是对各个executor已经不可见了! executor只会看到序列化后的闭包中的counter 。 因此,打印出的counter值会是0 。
在local 模式, 某些情况下 foreach函数所在的jvm与drvier的jvm一样,这时会更新到counter。
要使上述代码正确执行应该使用 Accumulator(累加器)。 累加器在spark中提供了一种更新变量值的安全机制。 本篇的Accumulator有更加详细的信息。
一般情况下, 闭包- 如循环 或者本地定义方法不应该修改全局状态。 spark不定义和保证修改闭包引用的外部对象的正确行为。 一些代码在本地模式下工作正常,但是在分布式模式下不如预期。 在这种需要全局聚合的情况下使用累加器。
打印RDD的元素
另外一个常见的习惯用法是打印RDD的元素, 使用方法: rdd.foreach(println) or rdd.map(println). 在单机上,这回输出期望的结果。 但是在集群模式下, executors调用的stdout 不会将输出输出值driver,会在execotor本地打印。 要打印所有的元素,可以使用 collect() , 他会将所有的RDD元素聚合到drvier节点, 但这可能会撑爆driver的内存( rdd.collect().foreach(println)) 。 比较安全的方式是使用 take方法: rdd.take(100).foreach(println).
使用 key-value 对
大多数的spark RDD操作支持所有的类型, 少部分支持只支持key-value对。 最常用的是shuffle操作, 如按key进行分组或聚合。
python
在Python中, 支持python内置的tupe 。
例如, 下面例子在键值对上使用reducebyKey操作, 以统计行的出现次数:
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
我们也可以使用 counts.sortbyKey() 将键值对按字母排序, 最后使用counts.collect()发送到driver。
java
在java中, 键值对使用Scala标准库scala.Tuple2 scala.Tuple2
表示 。使用 new Tuple2(a,b) 来创建一个tuple , 然后使用tuple._1() 和 tuple._2() 来方式减值。
RDDs键值对使用类 JavaPairRDD 表示。 可以从JavaRDDs 使用特殊的map操作(如: mapToPair and flatMapToPair) 来创建JavaPairRDD 实例。 JavaPairRDD 有标准的RDD函数和特殊的键值对函数。
例如, 下面示例使用reducebyKey 操作来统计每行在文本中出现的次数:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
我们也可以使用 counts.sortByKey() 排序 , 然后使用counts.collect() 将结果发送给driver。
注意: 当在键值对操作中,使用自定义的类作为key时, 必须确保自定义的equals()函数同时定义hashCode()函数。 详细信息参见: Object.hashCode() documentation.
scala
在scala , 这些操作只在包含Tuple2的RDD上可用。 键值对操作在类PairRDDFunctions中, 他自动wrap一个RDD。
例:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
也可以使用 counts.sortByKey(), 再使用counts.collect() 发送到driver上。
注意: 当使用自定义类作为key时, 该类需要实现equals() 和 hashCode()方法。 详情请参见 Object.hashCode() documentation.
transformations
下表列出了常用的transformations操作, 参见RDD API文档 (Scala, Java, Python, R)
, 和 键值对RDD 文档 (Scala, Java)。
Transformation | Meaning |
---|---|
map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. (通过将每个元素传给func返回一个新的数据集) |
filter(func) | Return a new dataset formed by selecting those elements of the source on which funcreturns true. (返回经过func函数为true的元素组成的新的数据集) |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). (类似map, 但是每个输入的item可以被映射为0或多个输出item,func函数应该返回一个序列而不是一个item) |
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. (类似map, 但是在RDD的每个分区单独计算, func应该是iterator类型) |
mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.(类似mapPartitions,但是会指定一个整形来指定分区,所以func的参数形式应该是 ) (Int, Iterator<T>) => Iterator<U> |
sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.(从数据中抽样fraction(分数)的数据,使用或不使用替换,使用给定的随机数种子。) |
union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument.(返回并集) |
intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. (返回交集) |
distinct([numPartitions])) | Return a new dataset that contains the distinct elements of the source dataset.(返回新的唯一的数据集) |
groupByKey([numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.(当使用在K,V对上,返回一个新的数据集对(K,Iterable<V>). 若为了聚集而对每个key进行grouping,像sum 或 average, 使用reduceByKey 或aggregateByKey性能会更好; 默认情况下并行度参数依赖于RDD的分区数,可以传入一个分区数参数以设置并行度。) |
reduceByKey(func, [numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey , the number of reduce tasks is configurable through an optional second argument. (对应(K,V)使用, 返回一个新的(K,V),每个key对应的值会通过func函数聚合,func的参数应该是(V,V)=> V. reduce任务的数量可以通过的第二个参数控制。) |
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey , the number of reduce tasks is configurable through an optional second argument.(对于(K,V)对返回一个新的(K,U)对, 对每个key的所有值和一个初始的中性值使用给定的combine函数聚集。允许聚集的值的类型与输入不同。 第二个参数指定并行度) |
sortByKey([ascending], [numPartitions]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.(当对数据集对(K,V)使用时,要求K实现了Ordered接口,返回排序后结果,参数可以指定升序或降序,第二个参数指定并行度) |
join(otherDataset, [numPartitions]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin , rightOuterJoin , and fullOuterJoin . (对两个数据集(K,V)和(K,U)使用, 返回(K,(V,U))。外连接使用 leftOuterJoin 、rightOuterJoin 和fullOutJoin ) |
cogroup(otherDataset, [numPartitions]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith . (对(K,V)和(K,W)使用, 返回 (K,(Iterable<V>, Iterable<W>)), 也可以使用groupwith) |
cartesian(otherDataset) | When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). (当对类型T和U使用, 返回(I,U)对) |
pipe(command, [envVars]) | Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings. (对每个RDD使用shell命令, RDD元素被作为stdin写入, 返回的stdout 作为一个新的RDD) |
coalesce(numPartitions) | Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.(降低分区的并行数。当对一个大数据集过滤后使用非常有用,会提供操作效率) |
repartition(numPartitions) | Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.(对RDD的数据重新洗牌,创建多余或少于的分区以平衡集群数据,这会引起网络传输数据。) |
repartitionAndSortWithinPartitions(partitioner) | Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery. (根据给定的分区参数进行重新分区并在每个分区按key进行排序。这笔repartition更高效,因为他将排序放到了shuffle机器中做。) |
Actions
下表列出了通用actions。 详细信息请参考 RDD API doc (Scala, Java, Python, R)
和 pair RDD functions doc (Scala, Java) for details.
Action | Meaning |
---|---|
reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.(使用func聚合dataset中的每个元素, func输入两个参数返回一个, func应该是可以关联的且可交换的,以保证并行计算的正确性。) |
collect() | Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.(向driver返回所有的dataset元素,以数组的形式,这适用当过滤或其他操作后,数据集中数据比较少的情况) |
count() | Return the number of elements in the dataset. (返回数据集中元素个数) |
first() | Return the first element of the dataset (similar to take(1)). (返回数据集第一个元素,同take(1)) |
take(n) | Return an array with the first n elements of the dataset. (以数组的形式返回数据集中前n个元素) |
takeSample(withReplacement, num, [seed]) | Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. (以数据的形式返回随机num个样本, 可以使用替换,可以输入随机种子) |
takeOrdered(n, [ordering]) | Return the first n elements of the RDD using either their natural order or a custom comparator. (返回前n个RDD元素,使用自然顺序或自定义比较器) |
saveAsTextFile(path) | Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. (向给定的路径目录写入数据集,写为text文本文件形式,支持本地文件系统、hdfs或任何支持的hadoop文件系统。 每个元素的toString方法被调用,作为一行写入文件中。) |
saveAsSequenceFile(path) | |
(Java and Scala) | Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). (向给定的文件路径将数据集的元素写为SequenceFile , 这对实现了Writable接口的的RDD key-value对有用。) |
saveAsObjectFile(path) (Java and Scala) | Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile() .(将数据集写为简单文件格式,使用java的序列号接口,该文件可以使用 SparkContext.objectFile() 加载) |
countByKey() | Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. (只对RDD的(K,V)有用,返回(K,int)值为每个key的数量) |
foreach(func) | Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details. (对数据集的每个元素使用func 。 这通常用于更新累加器或与外部存储交互。注意,修改foreach之外的累加器可能会导致未定义的操作,参见闭包) |
RDD API还提供上述aciton的异步版本, 如foreach的 foreachAsync , 他立即将FutureAction返回给调用者, 而不是等待计算完成。 可以使用FutrueAction进行异步执行的管理等。
Shuffle 操作
Spark中的某些操作会触发一个称为shuffle的事件。shuffle是Spark重新分配数据的机制,以便在不同分区之间进行分组。这通常涉及在执行者和机器之间复制数据,使得洗牌成为复杂而昂贵的操作。
Background
以reduceByKey操作来看下究竟发生了什么? reducebykey操作会生成一个新的RDD , 同一个key的所有的value组成一个元组。 但是并非同一个key的元素都在同一个分区、或同一个机器。
在计算时,需要从所有的分区中找同一key的数据,然后将他们汇总一块, 这个过程称为shuffle。
若希望在shuffle时对元素进行排序, 使用下面的方法:
- mapPartitions , 使用sorted 排序
- repartitionAndSortWithinPartitions 在重新分区同时进行排序
- sortBy 对RDD全局排序
会引起shuffle的操作有 repartition类操作,如 repartition
and coalesce
,
ByKey类操作(除了counting)如 groupByKey
and reduceByKey
,
Join类操作: 如 cogroup
and join
.
性能影响
shuffle操作非常昂贵, 他涉及到 disk I/O, data serialization, and network I/O. 为了shuffle, spark 生成一组map 任务来组织数据, 生成一组reduce任务来聚合之。这里的概念来自mapreduce , 并不直接与spark的map reduce关联。
在内部, map任务的结果会保存在内存中,知道不能存下。 然后会根据分区进行排序并写入文件。 对于reduce 任务相关的排序blocks。
一些shuffle操作会占用大量的内存,因为数据结构是存在内存中的,数据组织、转换都是内存, 当内存不足时, 会写入到磁盘上, 这会增加磁盘io和触发垃圾回收。
shuffle会在磁盘上生产大量中间文件, 从spark1.3开始这些文件不会被自动清理,直到相应的RDD不再使用并被垃圾收集为止。 这是为了避免再次使用时进行重新shuffle 。若应用程序保留对RDD的引用或GC启动不频繁,则垃圾收集可能会很久才会触发一次。 spark.local.dir指定的临时目录可能会很大。
可以通过参数调优shuffle 。 See the ‘Shuffle Behavior’ section within the Spark Configuration Guide.
RDD 持久化
在spark的操作中会持久化和缓存内存中的数据集。 持久化时,每个节点都会存储他在内存中的所有分区,并在使用时重用他们。 这可以使将来的行动更快。 缓存是迭代计算 和交互式计算的关键。
在使用RDD时可以标记使用其persist() 或 cache() 方法。 在第一次action时, 会缓存在内存中。 spark的缓存是容错的, 若分区丢失,会自动重新计算。
每个持久化RDD可以使用不同的存款级别, 如,保存在磁盘、内存、序列化java对象等。 通过向persist()方法传递StorageLevel对象来指定 (Scala,Java, Python)。 cache()的使用默认的存储级别, StorageLevel.MEMORY_ONLY 。 存储级别如下:
注意: 在python中, 对象的存储总是使用 Pickle 库, 因此会忽略使用的序列级别。 python可用的存储级别有 MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, and DISK_ONLY_2.
spark会自动持久化shuffle操作的中间结果, 即时用户不调用persist 。 这是为了避免在洗牌过程中节点失败时重新计算整个输入。我们仍建议用户调用persist生成的RDD,如果他们打算重用它。
存储级别的选择
Spark的存储级别旨在提供内存使用和CPU效率之间的不同折衷。我们建议通过以下流程来选择一个:
-
If your RDDs fit comfortably with the default storage level (
MEMORY_ONLY
), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible. -
If not, try using
MEMORY_ONLY_SER
and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala) -
Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.
-
Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.
删除数据
spark 自动监控缓存 , 使用LRU 删除就数据分区。 若想要手动处理, 使用 RDD.unpersist() 方法。
共享变量
通常,当spark操作函数执行时, 它将为函数中所有的变量单独拷贝一个副本传递到每个机器上, 这些副本变量不会回传回driver 。 在task之间支持通用的、 可读写的共享变量是非常低效的。 但是park提供了两种方式: 广播变量 和 累加器。
广播变量
广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。
Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。
通过在一个变量v上调用SparkContext.broadcast(v)可以创建广播变量。广播变量是围绕着v的封装,可以通过value方法访问这个变量。举例如下:
scala :
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
java:
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
python:
>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]
在创建了广播变量之后,在集群上的所有函数中应该使用它来替代使用v.这样v就不会不止一次地在节点之间传输了。另外,为了确保所有的节点获得相同的变量,对象v在被广播之后就不应该再修改。
累加器
累加器是仅通过关联和交换操作“添加”的变量,因此可以有效地支持并行。它们可以用来实现计数器(如在MapReduce中)或sums 。Spark本身支持数字类型的累加器,程序员可以添加对新类型的支持。
作为用户,您可以创建命名或未命名的累加器。如下图所示,命名累加器(在本例中counter)将显示在Web用户界面中。Spark在“任务”表中显示由任务修改的每个累加器的值。
图片.png
跟踪UI中的累加器对于理解运行阶段的进度很有用(注意:Python尚未支持)。
scala
数字类型的累加器可以通过分别调用SparkContext.longAccumulator()或SparkContext.doubleAccumulator() 累加Long或Double类型的值来创建。在集群上运行的任务使用add方法进行累计。但是,它们无法读取其价值。只有驱动程序可以使用其value方法读取累加器的值。
下面的代码显示了一个累加器,用于累加数组的元素:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
虽然此代码使用对Long类型的累加器的内置支持,但程序员还可以通过继承AccumulatorV2来创建它们自己的类型。AccumulatorV2抽象类有几个方法必须覆盖:reset
将累加器重置为零,add
将另一个值添加到累加器中,merge
将另一个相同类型的累加器合并到该累加器中 。其他必须被覆盖的方法包含在API文档中。例如,假设我们有一个MyVector
表示数学向量的类,我们可以这样写:
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
请注意,当程序员定义自己的AccumulatorV2类型时,生成的类型可能与添加的元素的类型不同。
累加器的更新只会在action中, spark保证每个task只会更新一次累加器。 重启任务不会更新该值 。 在transformation中 , 用户需要注意若任务或job阶段被重新执行,则每个任务只会更新一次。
累加器不会改变spark的延迟计算模式。 注意只有在RDD的action中时才可能会更新。 在延迟执行的map中不会更新。 如下:
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
java
java提供数值类型的累加器Long和Double , 通过 SparkContext.longAccumulator() or SparkContext.doubleAccumulator() 创建 。 (同scala)
LongAccumulator accum = jsc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10
自定义累加器类型 同scala :
class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {
private MyVector myVector = MyVector.createZeroVector();
public void reset() {
myVector.reset();
}
public void add(MyVector v) {
myVector.add(v);
}
...
}
// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");
累加器延迟更新机制 同scala
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.
python
使用 SparkContext.accumulator(v) 创建, 使用add方法 或 += 操作符累计。 但是exector不能读取其值, 只能driver读取。
>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
>>> accum.value
10
虽然此代码使用的内置支持的Int类型的累加器,但程序员也可以通过继承AccumulatorParam来创建自己的类型。AccumulatorParam接口有两种方法:zero
为您的数据类型提供“零值”,并将addInPlace
两个值一起添加。例如,假设我们有一个Vector
表示数学向量的类,我们可以这样写:
class VectorAccumulatorParam(AccumulatorParam):
def zero(self, initialValue):
return Vector.zeros(initialValue.size)
def addInPlace(self, v1, v2):
v1 += v2
return v1
# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
累加器的延时特性:
accum = sc.accumulator(0)
def g(x):
accum.add(x)
return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.
部署到集群
在提交申请指南介绍了如何提交申请到集群。简而言之,一旦将应用程序打包为JAR(用于Java / Scala)或一组.py
或多个.zip
文件(用于Python),该bin/spark-submit
脚本可让您将其提交给任何受支持的集群管理器。
从java/scala启动作业
该org.apache.spark.launcher 包提供类启动Spark作 。
单元测试
Spark支持任何流行的单元测试框架。只需创建SparkContext,运行您的操作,然后调用SparkContext.stop()把它关闭。确保在finally块或测试框架tearDown方法中停止它,因为Spark不支持在同一程序中同时运行的两个上下文。
后续参考哪些
您可以在Spark网站上看到一些Spark程序示例。另外,Spark在examples
目录中包含了几个样本(Scala, Java, Python, R)。您可以通过将类名传递给Spark的bin/run-example
脚本来运行Java和Scala示例; 例如:
./bin/run-example SparkPi
对于Python示例,请spark-submit
改为使用:
./bin/spark-submit examples/src/main/python/pi.py
对于R示例,请spark-submit
改为使用:
./bin/spark-submit examples/src/main/r/dataframe.R
有关优化程序的帮助,配置和 调整指南提供最佳做法的信息。它们对于确保您的数据以高效格式存储在内存中特别重要。有关部署的帮助,群集模式概述描述了分布式操作中涉及的组件以及支持的群集管理器。