Spark 2. RDDs 并行集合 外部数据集
可伸缩的分布式数据集 Resilient Distributed Datasets (RDDs)
原文地址: http://spark.apache.org/docs/latest/programming-guide.html
仅限交流使用,转载请注明出处。如有错误,欢迎指出
Henvealf/译
Spark 的核心概念就是 RDD,是一个容错性极高的元素集合,他可以使用在并行的操作中。这里有两种创建 RDDs 的方式:
-
在你的程序中并行存在的集合。
-
引用外部存储系统中的数据集,比如共享文件系统,HDFS,HBase,或者 Hadoop InputFormat 提供的其他数据源。
并行的数据集(Parallelized Collections)
Scala
并行的数据集的创建是通过调用 SparkContext 的 paralleize 方法完成的,该方法的参数是你程序中的一个集合( 一个 Scala Seq )。 这个集合中的元素就会被拷贝成一个分布式的能够被并行操作的数据集。举个例子:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
创建之后,我们就可以调用 distData.reduce((a, b) => a + b),来计算数组中元素的总和。之后我们试一试在集群上执行这个操作。
对于并行数据集,一个重要的属性(参数)就是分割数据集后的分区(partition)数目。Spark 将会在集群上为每一个分区运行一个 task。经典情况下,你会为你集群上的每个 CPU 提供 2-4 个分区。一般情况下,分区数会根据你的集群状况自动决定。当然,你也可以手动的指定他,就使用 parallelize 方法的的第二个参:
sc.parallelize(data, 10)
注意:有些地方会把 分区(partition)叫做 分片(slices),他们是相通的。
Java 代码
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
Python
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
外部数据集 (External Datasets)
Scala
Spark 能够从任何 Hadoop 支持的存储源中创建分布式数据集,包括你本地文件系统, HDFS, HBase,Cassandra,Amazon S3。
Spark 支持 文本文件, SequenceFiles 或者其他的 Hadoop InputForm。
可以使用 SparkContext 的 textFile 方法创建一个文本文件的 RDD, 方法得到一个文件的 URI,会把他读成行级别的集合。例子:
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
一旦创建成功, distFile 就能够被数据集操作来使用,比如计算所有行的字符个数:
distFile.map(s => s.length).reduce((a, b) => a + b)
使用 Spark 读取文件的几点注意事项:
-
如果使用本地文件系统路径,在工作节点中的相同路径的文件必须同样拥有可存取权限。要么将文件拷贝到所有的工作节点上,要么将文件放在网络挂载的共享文件系统中。
-
所有以文件为基础输入的方法,包括 textFile, 都支持在目录,压缩文件和通配符上运行。比如:textFile("/my/directory"), textFile("/my/directory/.txt"), 和 textFile("/my/directory/.gz").
-
textFile 也有第二个可选的参数,指明文件分区的个数。默认情况下,一个分区对应于 HDFS 中的一个块(128MB),当然,你也可以设置分区的个数比块多。注意,你的分区数量不可能比块少。
除了 文本文件, Spark 的 Scala API 还支持其他几种数据格式:
-
SparkContext.wholeTextFiles 能让你读取一个包含了多个文本文件的目录,他返回的是 (filename,content)对。可以与 textFile 的一行数据一条记录进行类比。
-
SequenceFiles, 使用 SparkContext 的 sequenceFile[K,V] 方法, K 为 键的类型,V 为 值的类型,他们应该是是 Hadoop 中 Writable类 的之类。初次之外, Spark 允许你将一些普通的 Writable 类型指定成他的基本类型。比如 sequenceFile[Int, String] 将会自动读成 IntWritable 与 Text。
-
对于其他的 Hadoop InputFormat,你可以使用 SparkContext.hadoopRDD 方法,他可以让你得到任意的 JobConf 和 input format 的类, key 的类, value 的类。设置他们的一种方式就是在带有你输入资源的 Hadoop Job 中设置。你也可以使用 SparkContext.newAPIHadoopRDD 来基于 “新的” mapreduce API (org.apache.hadoop.mapreduce)来设置。
-
RDD.saveAsObject 和 SparkContext.objectFile 使用一个由序列化了的 Java 对象组成的简单格式来保存一个RDD。他不像 Avro 那样高效,他只是提供一个保存任何 RDD 的简单方式。
Java 文档在此: http://spark.apache.org/docs/latest/programming-guide.html#external-datasets