弹性分布式数据集(RDD)

2020-05-20  本文已影响0人  竞媒体

RDD 不仅是一组不可变的JVM对象的分布集,可以让你执行高速运算。改数据集是分布式的。基于某种关键字,该数据集被划分成块,同时分发到执行器节点。RDD将跟踪(记入日志)应用于每个块的所有转换,以加快计算速度,并在发生错误和部分数据丢失时提供回退。

内部运行方式:RDD并行操作,每个转换并行执行,从而大大提高速度。数据集转换通常是惰性的。这就意味着任何转换仅在调用数据集上的操作时才执行。

一个示例数据集:

1.统计出某一列中不同值出现的次数

2.选出以字母 t 开头的

3.将结果打印到屏幕上

import findspark

findspark.init()

from pyspark import SparkContext,SparkConf

conf = SparkConf().setAppName("wordcount")

sc =SparkContext(conf=conf)

text_example = " Hooray! It's snowing! It's time to make a snowman.James runs out. He makes a big pile of sn

ow. He puts a big snowball on top. He adds a scarf and a hat. He adds an orange for the nose. He adds coal f

or the eyes and buttons.In the evening, James opens the door. What does he see? The snowman is moving! James

invites him in. The snowman has never been inside a house. He says hello to the cat. He plays with paper to

wels.A moment later, the snowman takes James's hand and goes out.They go up, up, up into the air! They are f

lying! What a wonderful night!The next morning, James jumps out of bed. He runs to the door.He wants to than

k the snowman. But he's gone."

wordCount= sc.parallelize(text_example.split(" ")).map(lambda word:(word,1)).filter(lambda val: val[0].startswith('t')).reduceByKey(lambda a, b : a + b

)

print(wordCount.collect())

输出:[('time', 1), ('to', 4), ('top.', 1), ('the', 9), ('towels.A', 1), ('takes', 1), ('thank', 1)]

创建RDD:

使用.parallelize(...) 集合(元素list 或 array)

data = sc.parallelize([('time', 1), ('to', 4), ('top.', 1), ('the', 9), ('towels.A', 1), ('takes', 1), ('thank', 1)])

引用位于本地或外部的某个文件(或者多个文件)

text_file = sc.textFile("/root/workdir/charlotte.txt")

wordCount= text_file.flatMap(lambda line: line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a, b : a + b)

Schema

RDD是无schema的数据结构,可以使用任何类型的数据结构:tuple、dict、list。

data_heterogenous = sc.parallelize([('Ferrari','fast'),{'Porsche':1000000},['Spain','visited',4504]]).collect()

可以访问对象中的数据:data_heterogeous[1]['Porsche']

.collect()方法把RDD的所有元素返回给驱动程序,驱动程序将其序列化成了一个列表。

转换

转换可以调整数据集。包括映射、筛选、连接、转换数据集中的值。

.map(...)转换

该方法应用在每个RDD元素上

.filter(...)转换

该方法可以让你从数据集中选择元素,该元素符合特定的标准。

.flatMap(...)转换

.flatMap(...)返回一个扁平的结果,而不是一个列表。

.distinct(...)转换

该方法返回指定列中不同值的列表。

.sample(...)转换

该方法返回数据集中的随机样本。

.leftOuterJoin(...)转换

根据两个数据集中都有的值来连接两个RDD,并返回左侧的RDD记录,而右边的记录附加在两个RDD匹配的地方。

.repartition(...)转换

重新对数据集进行分区,改变了数据集分区的数量。

操作

和转换不同,操作执行数据集上的计划任务。一旦完成数据转换,则可以执行相应转换。

.take(...)方法

返回单个数据分区的前n行

.takeSample(...)

返回随机记录

.collect(...)

返回所有RDD的元素给驱动程序。

.reduce(...)方法

使用指定的方法减少RDD中的元素。

.reduceByKey(...)方法

和.reduce(...)类似,但只在键-键基础上进行。

.count(...)方法

统计RDD里的元素数量

.saveAsTextFile(...)方法

对RDD执行.saveAsTextFile(...)可以让RDD保存为文本文件:每个文件一个分区。

.foreach(...)方法

对RDD里的每个元素,用迭代的方法应用相同的函数。

总结:RDD是无schema的数据结构,是Spark的核心。Spark中的转换是惰性的,它们只在操作被调用是执行。

上一篇下一篇

猜你喜欢

热点阅读