大数据,机器学习,人工智能大数据大数据&云计算

Spark08 RDD KV数据

2020-05-31  本文已影响0人  山高月更阔

键值对 RDD 是 Spark 中许多操作所需要的常见数据类型。Spark 为包含简直对类型的 RDD 提供了一些专有的操作。这些 RDD 被称为 pair RDD。

创建 Pair RDD

在 Spark 中有很多种方法创建 Pair RDD。很多存储键值对的数据格式会在读取值时直接返回其键值对数据组成的 pair RDD。普通的 RDD 可通过 map 转换为 pair RDD。

构建键值对 RDD 的方法在不同语言中会有所不同。在 Python 中,需要返回一个二元组组成的 RDD

>>> rdd=sc.parallelize(['zhang 1','li 2'])
>>> pairs = rdd.map(lambda x : (x.split(' ')[0],x.split(' ')[1]))
>>> pairs.collect()
[('zhang', '1'), ('li', '2')]

Pair RDD 的转换操作

以键值对集合[(1,2),(3,4),(3,6)] 为例

函数名 目的 实例 结果
                                 
reduceByKey                                                                       
合并具有相同键的值 rdd.reduceByKey(lambda x,y : x + y) [('3', '46'), ('1', '2')]
groupByKey 对具有相同键值进行分组 rdd.groupByKey() [(1,[2]),(3,[4,6])]
combineByKey 使用不同的返回类型合并具有相同的键的值
mapValues(func) 对 pari RDD 的每个值应用一个函数而不改变键 rdd.mapValues(lambda x : x + 1) [(1, 3), (3, 5), (3, 7)]
flatMapValues 对 Pair RDD 中的每个值应用一个返回迭代器的函数,然后对每个元素都生成一个对应原键的键值对记录。 rdd.flatMapValues(lambda x : list(range(int(x)))) [('1', 0), ('1', 1), ('3', 0), ('3', 1), ('3', 2), ('3', 3), ('3', 0), ('3', 1), ('3', 2), ('3', 3), ('3', 4), ('3', 5)]
keys 返回一个仅包含键的 RDD rdd.keys() ['1', '3', '3']
values 返回一个仅包含值的 RDD rdd.values() ['2', '4', '6']
sortByKey 返回一个根据键排序的 RDD rdd.sortByKey() [('1', '2'), ('3', '4'), ('3', '6')]

针对两个 pair RDD 的转化操作

rdd = [(1,2),(3,4),(3,6)]
other = [(3,9)]

函数名 目的 实例 结果
                                 
subtractByKey                                                                        
删除 rdd 中键与 other 相同的键相同的元素 rdd.subtractByKey(other) [(1, 2)]
join 对两个 RDD 进行内连接 rdd.join(other) [(3, (4, 9)), (3, (6, 9))]
rightOuterJoin 对两个 RDD 进行右连接 rdd.rightOuterJoin(other) [(3, (4, 9)), (3, (6, 9))]
leftOuterJoin 对两个 RDD 进行左连接 rdd.leftOuterJoin(other) [(1, (2, None)), (3, (4, 9)), (3, (6, 9))]
rdd.cogroup 将两个 RDD 相同的键分组到一起 rdd.cogroup(other) [1,([2],[]),(3,([4,6],[9]))]

行动操作

例如:rdd = [(1,2),(3,4),(3,6)]

函数名 目的 实例 结果
countByKey 对每个键进对应的元素分别计数 rdd.countByKey() [(1, 1), (3, 2)]
collectAsMap 将结果以映射表的形式返回, rdd.collectAsMap() {1:2,3:6}
lookup 返回给定键的所有值 rdd.lookup(3) [4,6]

其他操作

键值对 RDD 也是 RDD 支持 RDD 所有算子 键值对 RDD 都支持
举例
Pyhton

>>> rdd = sc.parallelize([(1,2),(3,4),(3,6)])
# 筛选第二个元素大于3的元素
>>> res_rdd = rdd.filter(lambda kv: kv[1] > 3)
>>> res_rdd.collect()
[(3, 4), (3, 6)]

上述图标只是简单说明 键值对 RDD 的一些操作 下面通过一些实例来进一步说明其中一些操作。有些操作比较简单易懂,所以举例不会涵盖所有操作。

求平均值

给定一下数据求平均值,会设计 mapValues 和 reduceByKey

key value
a 1
b 0
b 3
c 2
d 6
d 9
d 7

通过 mapValues 转换为如下结果

key value
a (1,1)
b (0,1)
b (3,1)
c (2,1)
d (6,1)
d (9,1)
d (7,1)

value 为元组 元组第一个值位 key 对应的值 元组第二个值位 次数。要求平均值,需要就出所有值的和,和次数之和
通过 reduceByKey 求出 值之和,和次数之和
如下

key value
a (1,1)
b (3,2)
c (2,1)
d (22,3)

有了次数之和,和 value 之和 求平均值 value 之和除以次数

完整代码如下
Python

>>> rdd = sc.parallelize([('a',1),('b',0),('b',3),('c',2),('d',6),('d',7),('d',9)])
>>> map_rdd = rdd.mapValues(lambda x : (x,1))
>>> result = map_add.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])
>>> result.collect()
[('b', (3, 2)), ('c', (2, 1)), ('a', (1, 1)), ('d', (22, 3))]
# 计算平均值
>>> result = result.mapValues(lambda x : x[0] / x[1])
[('b', 1.5), ('c', 2.0), ('a', 1.0), ('d', 7.333333333333333)]

combineByKey 算子说明

combineByKey 算子的参数较多 在表格中没有举例用法
combineByKey 有三个参数
createCombiner: 初始化方法,如果是新元素 combineByKey 都会调用改方法来初始化,注意:这一过程会在每个分区中第一次出现各个键时发生。而不是在整个 RDD 中第一次出现一个键时发生
mergeValue: 分区内合并方法
mergeCombiners:分区间合并方法
利用 combineByKey 求平均值
Python

 rdd = sc.parallelize([('a',1),('b',0),('b',3),('c',2),('d',6),('d',7),('d',9)])
>>> res_rdd = rdd.combineByKey((lambda x:(x,1)),(lambda x,y:(x[0]+y,x[1]+1)),(lambda x,y:(x[0]+y[0],x[1]+y[1])))
>>> res_rdd.collect()
[('b', (3, 2)), ('c', (2, 1)), ('a', (1, 1)), ('d', (22, 3))]
# 计算平均值
>>> res_rdd = res_rdd.mapValues(lambda x : x[0] / x[1])
[('b', 1.5), ('c', 2.0), ('a', 1.0), ('d', 7.333333333333333)]

combineByKey 处理数据流程图
原数据为:

key value
a 1
b 0
b 3
c 2
d 6
d 9
d 7

假设分为3个区
分区一:

a 1
b 0
b 3

分区二:

c 2
d 6
d 9

分区三:

d 7

分区中合并间 后:
这里调用 (lambda x,y:(x[0]+y,x[1]+1)) 值相加 次数+1
如果遇到新元素会调用 (lambda x:(x,1)), 转换为 值-次数的 kv结构
分区一:

a (1,1)
b (3,2)

分区二:

c (2,1)
d (15,2)

分区三:

d (7,1)

分区见合并
调用方法: (lambda x,y:(x[0]+y[0],x[1]+y[1]))
所有值相加,次数相加

a (1,1)
b (3,2)
c (2,1)
d (22,3)

sortByKey 算子

sortByKey 两个参数
ascending: 是否结果为升序 默认True。True 为升序 False 为降序
keyfunc: 自定义排序规则
例:
Python

sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x)):
上一篇下一篇

猜你喜欢

热点阅读