《Spark指南》四、编程指引-Scala篇(上)
本文主要翻译至链接且不局限于该文内容,也加入了笔者实践内容,翻译水平有限,欢迎指正,转载请注明出处。
概述
每个Spark应用程序都包含了一个驱动程序,用于执行用户编写的main函数,以及在集群上执行各种并行操作。Spark抽象了一个称为RDD(resilient distributed dataset,弹性分布式数据集)的数据集,它是一个数据元素的集合,这些元素被分散在整个集群的各个节点上,可以进行并行操作。RDDs可以从Hadoop的文件系统中创建(HDFS,或其他Hadoop支持的文件系统),或者从驱动程序中已存在的Scala集合创建,然后进行转换。用户可以在Spark中设置将RDD驻留在内存,以便在并行操作中可以高效重用。此外,RDDs也支持从节点故障中自动恢复。
Spark还抽象了一个在并行操作中使用的称为“shared variables”(共享变量)的概念。默认情况下,当Spark在集群的不同节点中执行一系列并发作业时(它们执行一个相同的函数),它将会把函数中的变量拷贝到每一个作业上。这些变量有时候需要在作业之间共享,有时候也会在作业和驱动程序之间共享。Spark支持两种类型的变量:1)广播变量(broadcast variables),可以当做是所有节点内存中的一个缓存值;2)累加器(Accumulators),这类变量只允许“加减”,如同计数器。
本篇指引描述了Spark支持的三种语言编程方式,包括Scala、Java和Pyhon,为了方便阅读,笔者将其拆成多篇文章,其中Python版本暂不翻译。本篇为Scala版,由于文章较长,上下两篇。
如果你要跟着编程指引进行学习,建议你使用命令行交互方式启动Spark(对于Scala,使用Spark-shell命令,对于Python,使用pyspark命令),或者在IDE编写代码进行学习。
引入Spark
Spark2.1.0默认与Scala2.11一起发布,当然也可以和其他版本的Scala一起工作。为了保证程序能够正常执行,你最好使用一个Scala兼容版本(例如:2.11.X,应该谨慎选择jdk和scala的对应版本,例如2.12版本的scala只支持java8,较老的scala版本兼容jdk6+,而java9还不支持使用scala,参考scala)。
笔者安装的Spark版本为2.1.0,使用的scala版本为2.11.7,jdk为7.0。
开始编写Spark应用程序前,你需要先创建一个工程,然后引入相关的依赖,可以使用sbt构建你的工程,笔者在IntelliJ中安装了scala插件,然后基于sbt新建了一个工程,如图:
![](https://img.haomeiwen.com/i3324330/3de86f3660228c80.png)
之后,在build.sbt中编写项目依赖以引入spark相关的jar包:
name := "scala-demo"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
在这个文件中,我们指定了scala的版本以及依赖的库,sbt会自动解析这个依赖,并从远程中央仓库中下载相应的jar包。
关于如何在IntelliJ中使用Spark,可以参考这篇博客。
注意,在IntelliJ中编写的Spark程序不能够直接提交到Spark集群上执行,但可以简单的在“local”上调试,如果需要提交到Spark集群上,应该先构建成jar包,然后使用spark-submit提交。
初始化Spark
Spark应用程序的第一步是创建一个SparkContext对象,它将用来连接Spark集群。SparkContext的创建需要一个SparkConf对象作为参数,这个对象包含了应用程序的详细信息。
一个JVM只能使用一个SparkContext,在创建新的SparkContext之前,你需要调用stop()方法来停止活跃着的SparkContext。下面是一个创建实例:
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
在这个例子中,appName指应用程序的名称,它将展示在集群的webUI上。master指的是Spark集群的地址(可以是三种运行模式之一,Spark独立模式,Mesos模式和YARN模式),或者直接用“local”表示本地的Spark。实践中,你可能不会直接把master硬编码在代码中,而是使用spark-submit命令时实时指定,但是如果是本地测试和单元测试,传递“local”以在本地运行可以提高效率。
使用Shell命令行
在Spark的shell环境中,已经默认为你创建了一个SparkContext,变量名为“sc”,你不必重新创建,事实上重新创建的SparkContext也无法工作。你可以在执行命令时使用--master选项配置master的地址,例如:
./bin/spark-shell --master local[4]
或者,使用--jars选项将jars文件添加的classpath中,多个jars文件之间使用英文逗号分隔,例如:
./bin/spark-shell --master local[4] --jars code.jar,code2.jar
甚至你可以使用--packages选项直接添加依赖库的Maven坐标,多个依赖库之间使用逗号分隔,如果有附加的代码仓库,可以使用--repositories引入。例如:
./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
关于spark-shell脚本,可以在笔者翻译的《Spark指南》三、 提交应用程序这篇文章中找到更多的讲解。
弹性分布式数据集(RDDs)
Spark围绕着一个称为弹性分布式数据集(RDD)的虚拟概念来操作数据,这是一个可以并行操作的可容错的元素集合。 有两种方法来创建RDD:并行化驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统,HDFS,HBase或提供Hadoop InputFormat的任何数据源。
并行集合
可以通过以内存一个已存在的集合为数据集(Scala Seq),然后调用SparkContext的parallelize方法创建一个并行集合。该集合的数据将被拷贝到集群中,成为一个分布式数据集。例如,
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
一旦创建成功,这个分布式数据集就可以被并行操作。例如,我们可以调用distData.reduce((a,b) => a+b)来计算该数据集的累加和。
并行集合的一个重要参数是数据集在集群中被拆分成的分区数,Spark将会为每个分区单独创建一个task,通常,可以为集群中的每个CPU分配2~4个分区,如果不加指定,Spark会根据你集群中的配置自动设置分区数。如果你想手动指定,只要在parallelize函数中传递参数,例如sc.parallelize(data, 10)。注意,代码中的一些地方使用术语slice(分区的同义词)来保持向后兼容性。
外部数据集
Spark可以从任意Hadoop支持的数据源中创建分布式数据集,包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等等。Spark支持文本文件,SequenceFiles 和其他任意的Hadoop InputFormat。
文本文件的RDDs可以通过SparkContext的textFile方法进行创建,这个方法使用文件的URI作为参数(例如,本地文件路径,hdfs://,s3n:// 等URI),文件被创建后,将被构造成一个集合,每一行作为一个元素。下面是一个调用例子:
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
一旦创建成功,就可以在distFile上使用各种数据集的操作,例如,可以通过下面的代码统计文件的字符数:
scala> distFile.map(s => s.length).reduce((a, b) => a + b)
在Spark中读取文件应该注意如下几个事项:
- 如果使用本地文件系统的路径,并提交到集群,该文件必须同时在工作节点中存在,可以拷贝文件到各个工作节点,或者使用基于网络的共享文件系统。
- 所有基于文件的输入方法,包括textFile方法,都支持以目录、压缩文件和通配符指定的文件,例如可以创建textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz")。
- textFile方法支持可选的参数,用于控制文件的分区数。默认情况下,Spark给每个文件块创建一个分区(HDFS中,文件块的默认大小为128M),你可以传递一个其他的数值来指定分区数的大小。注意,自己指定时,分区数不可以小于文件块数。
除了文本文件,Spark的Scala API也支持其他的一些数据格式:
- SparkContext.wholeTextFiles 允许你读取一个包含多个小文件的目录,然后以(filename, content) pairs返回。这个方法和textFile不一致,textFile返回的格式中,每一行是一条记录。
- 对于其他Hadoop InputFormats的文件,你可以使用SparkContext.hadoopRDD方法,该方法接受任意的JobConf和输入格式类,键类和值类。使用时,与使用输入源的Hadoop作业相同的方式进行设置。 你还可以使用SparkContext.newAPIHadoopRDD,用于创建基于“新”MapReduce API(org.apache.hadoop.mapreduce)的InputFormats。
- RDD.saveAsObjectFile和SparkContext.objectFile方法支持以包含序列化Java对象的简单格式保存RDD。 它提供了一种保存RDD的简单方法。
RDD操作
RDDs支持两种操作:1)transformations,这类操作对一个已存在的RDD进行转换操作,然后产生新的RDD;2)actions,这类操作对一个数据集进行计算,然后返回一个值给驱动程序。例如,map函数就是一个transformation方法,它以把一个数据集的每个元素传递个一个开发者定义的函数,然后返回一个经过处理的新的RDD。而reduce函数就是一个action,它聚集一个RDD里的所有元素,对其进行某个操作(同样由开发者定义),然后给驱动程序返回最终的结果。
Spark中的所有transformations都是延时的,即它们不会立刻计算出结果, 相反,他们只记住应用于一些基础数据集(例如文件)的transformations。 仅当一个action需要将结果返回到驱动程序时才会计算对应的transformation。 这种设计使Spark能够更高效地运行。
默认情况下,每一个transformed RDD在运行一个action时都会重新进行计算,但是,你可以使用Spark提供的persist或cache方法将一个RDD保留在集群机器的内存中,这样下次就可以更快的访问它们。Spark还支持在磁盘上持久化存储RDD,或者在多个节点上复制RDD。
RDD基础
为了说明RDD入门,参考如下简单的程序:
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
第一行以外部文件定义了一个基本的RDD。此时lines仅仅是指向文件的一个指针,还未导入内存。第二行对这个数据集进行了map transformation,提交的函数用于计算每一行的字符长度,此时,由于Spark的lazy特性,还未立刻进行计算。第三行,执行了一个reduce action,函数是用来累加多行的字符长度和,此时,Spark将计算任务拆成多个task以在多个独立的机器上执行,在每台机器上都只对应的对拷贝到本地的数据子集进行map和reduce操作,然后将结果返回给驱动程序。
如果我们希望下次继续使用lineLengths,我们需要在reduce方法前添加如下代码:
lineLengths.persist()
使用本方法后,lineLengths的结果将在第一次计算后保留在节点的内存中。
传递Functions给Spark
Spark上的计算强烈依赖于提交给它的function,编写代码时,推荐一下两种方式:
- 匿名函数语法,推荐在较短的代码中使用。
- 以一个全局单例的静态方法提交,例如,以你可以像下面这样定义object MyFunctions,然后将MyFunctions.func1作为参数传递:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
注意,虽然Spark允许把一个类示例的方法引用作为Function(与单例object方式相反),但是这种方式需要把包含该类的对象和方法发送到集群中。例如下面这种方式的调用:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
如果我们创建了一个MyClass实例,然后调用该实例的doStuff方法,由于map操作引用了该实例的func1方法,因此整个实例都需要被发送到集群中(维持中间状态)。执行 rdd.map(x => this.func1(x)) 也是类似的效果。
相似的,访问外部对象的字段,也会间接引用整个对象,例如:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
上面这个Function相当于 rdd.map(x => this.field + x) 。为了避免这个问题,最简单的方式是把字段拷贝成一个局部变量,而不是引用外部的字段。例如:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
理解闭包
Spark的学习难点是当代码在跨集群执行时,理解变量和方法的作用域和生命周期。RDD对一些超出作用域的变量进行的操作,会给开发者造成一些困惑,下面通过一个实际的例子来仔细阐述这些细节。
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: 不要这样做
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
上面的这个例子如果在不同的JVM中执行,将会产生不同的结果,例如它在local模式和集群模式下的运行结果将会不同,我们进一步解释这个原因。
本地模式&集群模式
Spark在执行作业时,会把对RDD的操作拆分成多个子任务,每个子任务都会有一个执行器。在执行器执行代码之前,Spark会计算该子任务的闭包,所谓的闭包,就是在操作RDD时(本例子中的操作即foreach())所需要的变量和方法必须对执行器可见。该闭包被序列化后发送到每一个执行器上。
闭包中的变量在发送给执行器时会被拷贝,因此当变量counter在foreach函数中被引用时,已经不是驱动器节点上的counter变量了。虽然驱动器节点的内存中也有一个counter变量,但这个变量对其他的执行器是不可见的,执行器只能看见序列化后的闭包拷贝的副本。于是,这段代码执行的最后结果仍然是0,因为执行器节点执行的操作都是在它们序列化后的couter变量上。
而在local模式下,foreach函数的执行器与驱动程序使用的是同一个JVM,因此它们将会引用同一个counter变量,更新操作也能够正常执行。
如果想要正常执行,建议开发者使用Accumulator
(在《Spark指南》四、编程指引-Scala篇(下)一文中将详细描述)。Spark中的Accumulators累加器提供了一种在集群中更新变量的安全方式。
通常,闭包中构造的循环或者本地方法,不应该被用于更改某些全局的变量。Spark无法保证闭包之外对这些引用对象的更改行为,有一些代码可以在local模式下正常工作,但这仅仅是巧合,一旦到了分布式模式下,可能这些代码就无法工作了。如果需要一些全局的聚合,建议使用Accumulator。
打印RDD的元素
另一个常见的习惯是使用rdd.foreach(println) 或者 rdd.map(println)来尝试打印一个RDD的输出。在单机模式下,所有的元素都可以正常的被打印出来,但是在集群模式中,所有的元素都被打印到执行器的控制台中,而不是在驱动程序的机器上。如果想达到这个目的,你可以先使用collect()方法将所有元素聚集到驱动机器上,然后执行print操作,例如:rdd.collect().foreach(println)。然而,这可能会使驱动机器产生OOM(内存溢出)错误,因为这一操作会将整个RDD集合都提取到同一台机器上。如果你只是想打印少数元素,建议你使用诸如rdd.take(100).foreach(println)的代码。
备注:鉴于篇幅太长,此篇文章拆成两篇来翻译,下一篇请参考笔者文集中的《Spark指南》四、编程指引-Scala篇(下)。