创建RDD的两种方式及注意事项

2019-05-03  本文已影响0人  喵星人ZC

一、parallelizing an existing collection in your driver program
第一种通过并行化一个存在的集合来得到一个RDD

scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

可以看到调用SparkContext的parallelize可以将一个集合转成一个ParallelCollectionRDD的RDD。我们可以对这个RDD进行求和操作

scala> distData.reduce((a, b) => a + b)
res0: Int = 15

二、referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

第二种通过外部存储系统来得到一个RDD,这个是生产常用的。

scala> val rdd = sc.textFile("/home/hadoop/soul/data/wordcount.txt")
rdd: org.apache.spark.rdd.RDD[String] = /home/hadoop/soul/data/wordcount.txt MapPartitionsRDD[2] at textFile at <console>:24

or

scala> val rdd = sc.textFile("file///home/hadoop/soul/data/wordcount.txt")
rdd: org.apache.spark.rdd.RDD[String] = file///home/hadoop/soul/data/wordcount.txt MapPartitionsRDD[4] at textFile at <console>:24

or

scala> val rdd = sc.textFile("hdfs://hadoop000:8020/g6/data/wordcount.txt")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop000:8020/g6/data/wordcount.txt MapPartitionsRDD[6] at textFile at <console>:24
--------------------------------------------------------------------------------------------------
scala> rdd.collect
res1: Array[String] = Array(spark       hadoop  hadoop, hive    hbase   hbase, hive     hadoop  hadoop, hive    hadoop  hadoop)

三、注意事项

SparkContext的textFile方法支持目录(directories)/压缩文件,或者使用通配符

上一篇下一篇

猜你喜欢

热点阅读