Spark学习笔记

3.Spark学习(Python版本):Spark RDD编程基

2018-08-11  本文已影响51人  马淑

Spark中针对RDD的操作包括创建RDD、RDD转换操作和RDD行动操作。

Step1. 启动HDFS和Spark
mashu@mashu-Inspiron-5458:~$ cd /usr/local/hadoop
mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./sbin/start-dfs.sh
mashu@mashu-Inspiron-5458:/usr/local/hadoop$ cd /usr/local/spark
mashu@mashu-Inspiron-5458:/usr/local/spark$ ./bin/pyspark
Step2. 新建rdd文件夹以存储本案例资源
mashu@mashu-Inspiron-5458:~$ cd /usr/local/spark/python_code
mashu@mashu-Inspiron-5458:/usr/local/spark/python_code$ mkdir rdd

在rdd目录下新建一个word.txt文件,你可以在文件里面随便输入几行英文语句用来测试。

Step3-1. RDD创建 - 从本地创建

RDD可以通过两种方式创建:

从本地创建方法:

>>> line = sc.textFile('file:///usr/local/spark/python_code/rdd/word_rdd.txt')
>>> line.first()
'I am learning Spark RDD' 
Step3-2. RDD创建 - 从HDFS创建

在HDFS用户根目录下创建rdd文件夹,将本地的word_rdd.txt上传到这里。

mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./bin/hdfs dfs -mkdir /user/mashu/rdd
mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./bin/hdfs dfs -ls .
Found 2 items
drwxr-xr-x   - mashu supergroup          0 2018-08-11 15:09 rdd
-rw-r--r--   1 mashu supergroup         41 2018-08-11 14:06 word.txt
mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./bin/hdfs dfs -put /usr/local/spark/python_code/rdd/word_rdd.txt ./rdd/
mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./bin/hdfs dfs -ls ./rdd
Found 1 items
-rw-r--r--   1 mashu supergroup         65 2018-08-11 15:13 rdd/word_rdd.txt

在HDFS中创建RDD时,以下3个命令等价:
line = sc.textFile('hdfs://localhost:9000/user/mashu/rdd/word_rdd.txt')
line = sc.textFile('./rdd/word_rdd.txt')
line = sc.textFile('/user/mashu/rdd/word_rdd.txt')

>>> line = sc.textFile('hdfs://localhost:9000/user/mashu/rdd/word_rdd.txt')
>>> line.first()
'I am learning Spark RDD'
>>> line = sc.textFile('./rdd/word_rdd.txt')
>>> line.first()
'I am learning Spark RDD'
>>> line = sc.textFile('/user/mashu/rdd/word_rdd.txt')
>>> line.first()
'I am learning Spark RDD'
Step3-3. RDD创建 - 通过并行集合(数组)创建RDD

可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。

>>> nums = [1,2,3,4,5]
>>> rdd = sc.parallelize(nums)
>>> rdd.collect()
[1, 2, 3, 4, 5]
Step4. RDD操作

RDD被创建好以后,在后续使用过程中一般会发生两种操作:

每一次RDD转换操作都会产生不同的RDD,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。

Step4-1. RDD操作 - 转换操作
一些常见的转换操作(Transformation API):
Step4-2. RDD操作 - 行动操作
一些常见的行动操作(Action API):
Step4-3. RDD操作 - 实例

filter();collect()

>>> lines = sc.textFile('./rdd/word_rdd.txt')
>>> lines.filter(lambda line: 'Spark' in lines).count()
3
>>> lines.collect()
['I am learning Spark RDD', 'Spark is faster than Hadoop', 'I love Spark']

map();reduce()

找出文本文件中单行文本所包含的单词数量的最大值

>>> lines = sc.textFile('./rdd/word_rdd.txt')
>>> lines.map(lambda line:len(line.split(' '))).reduce(lambda a,b:)
5

p.s.对三目运算 a>b and a or b的逻辑理解不了

Step5. 持久化

RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。如果我们需要多次调用不同的行动操作,都会触发一次从头开始的计算。特别对于迭代计算而言,代价很大。

>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> print(rdd.count()) #行动操作,触发一次真正从头到尾的计算
3
>>> print(','.join(rdd.collect())) #行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hive

上面代码执行过程中,前后共触发了两次从头到尾的计算。
实际上,可以通过持久化(缓存)机制避免这种重复计算的开销。可以使用persist()方法对一个RDD标记为持久化。具体操作如下:

>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> rdd.cache()  #会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这是rdd还没有被计算生成
ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:175
>>> print(rdd.count()) #第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
3
>>> print(','.join(rdd.collect())) #第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive
Step6. 分区

RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区。
对于不同的Spark部署模式而言,都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:

因此,对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism,比如:

>>> array = [1,2,3,4,5]
>>> rdd = sc.parallelize(array,2) #设置两个分区

对于textFile而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism。
如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)。

Step7. 打印元素
本地:

rdd.foreach(print)或者rdd.map(print)

集群上:

采用集群模式执行时,在worker节点上执行打印语句是输出到worker节点的stdout中,而不是输出到任务控制节点Driver Program中,因此,任务控制节点Driver Program中的stdout是不会显示打印语句的这些输出内容的。为了能够把所有worker节点上的打印输出信息也显示到Driver Program中,可以使用collect()方法:
rdd.collect().foreach(print) #会抓取各个worker节点上的所有RDD元素,这能会导致内存溢出
rdd.take(100).foreach(print) #用于只需要打印RDD的部分元素时

上一篇下一篇

猜你喜欢

热点阅读