4.Spark学习(Python版本):键值对RDD
2018-08-11 本文已影响80人
马淑
虽然RDD中可以包含任何类型的对象,但是“键值对”是一种比较常见的RDD元素类型,分组和聚合操作中经常会用到。
Spark操作中经常会用到“键值对RDD”(Pair RDD),用于完成聚合计算。普通RDD里面存储的数据类型是Int、String等,而“键值对RDD”里面存储的数据类型是“键值对”。
Step1-1. 键值对RDD的创建 - 从文建中加载使用map()实现
>>> lines = sc.textFile('./rdd/word_rdd.txt')
>>> lines.first()
'I am learning Spark RDD'
>>> pairRDD = lines.flatMap(lambda line:line.split(' ')).map(lambda word:(word,1))
>>> pairRDD.foreach(print)
('I', 1)
('love', 1)
('Spark', 1)
('I', 1)
('am', 1)
('learning', 1)
('Spark', 1)
('RDD', 1)
('Spark', 1)
('is', 1)
('faster', 1)
('than', 1)
('Hadoop', 1)
Step1-2. 键值对RDD的创建 - 通过并行集合(列表)创建RDD
>>> list = ['Hadoop','Spark','Hive','Spark']
>>> rdd = sc.parallelize(list)
>>> pairRDD = rdd.map(lambda word:(word,1))
>>> pairRDD.foreach(print)
('Hadoop', 1)
('Spark', 1)
('Spark', 1)
('Hive', 1)
Step2. 常用的键值对转换操作
-
reduceByKey()
reduceByKey(func)的功能是,使用func函数合并具有相同键的值。比如,reduceByKey((a,b) => a+b),有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),对具有相同key的键值对进行合并后的结果就是:(“spark”,3)、(“hadoop”,8)。
>>> list = ['Hadoop','Spark','Hive','Hadoop','Kafka','Hive','Hadoop']
>>> rdd = sc.parallelize(list)
>>> pairRDD = rdd.map(lambda word:(word,1))
>>> pairRDD.reduceByKey(lambda a,b:a+b).foreach(print)
[Stage 0:> (0 + 4) / 4]('Kafka', 1)
('Spark', 1)
('Hadoop', 3)
('Hive', 2)
-
groupByKey()
groupByKey()的功能是,对具有相同键的值进行分组。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5))。
>>> pairRDD.groupByKey()
PythonRDD[10] at RDD at PythonRDD.scala:48
>>> pairRDD.groupByKey().foreach(print)
('Spark', <pyspark.resultiterable.ResultIterable object at 0x7fcbd6448a58>)
('Hive', <pyspark.resultiterable.ResultIterable object at 0x7fcbcb74d9b0>)
('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7fcbcb74d9b0>)
('Kafka', <pyspark.resultiterable.ResultIterable object at 0x7fcbcb74dba8>)
>>>
-
keys(), values
跟python中字典类型类似。
>>> pairRDD.keys().foreach(print)
Spark
Hive
Hive
Hadoop
Hadoop
Hadoop
Kafka
>>> pairRDD.values().foreach(print)
1
1
1
1
1
1
1
-
sortByKey()
sortByKey()的功能是返回一个根据键排序的RDD。
>>> pairRDD.sortByKey().foreach(print)
('Kafka', 1)
('Hadoop', 1)
('Hadoop', 1)
('Hadoop', 1)
('Spark', 1)
('Hive', 1)
('Hive', 1)
-
mapValues(func)
我们只想对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。对于这种情形,Spark提供了mapValues(func)
。
>>> pairRDD.mapValues(lambda x:x+1).foreach(print)
('Hadoop', 2)
('Spark', 2)
('Hive', 2)
('Hadoop', 2)
('Kafka', 2)
('Hive', 2)
('Hadoop', 2)
-
join()
类似数据库中内连接的概念。
>>> pairRDD1 = sc.parallelize([('spark',1),('spark',2),('hadoop',3),('hadoop',5)])
>>> pairRDD2 = sc.parallelize([('spark','fast')])
>>> pairRDD1.join(pairRDD2).foreach(print)
('spark', (1, 'fast'))
('spark', (2, 'fast'))
>>>
-
cogroup()