Spark08 RDD KV数据
键值对 RDD 是 Spark 中许多操作所需要的常见数据类型。Spark 为包含简直对类型的 RDD 提供了一些专有的操作。这些 RDD 被称为 pair RDD。
创建 Pair RDD
在 Spark 中有很多种方法创建 Pair RDD。很多存储键值对的数据格式会在读取值时直接返回其键值对数据组成的 pair RDD。普通的 RDD 可通过 map 转换为 pair RDD。
构建键值对 RDD 的方法在不同语言中会有所不同。在 Python 中,需要返回一个二元组组成的 RDD
>>> rdd=sc.parallelize(['zhang 1','li 2'])
>>> pairs = rdd.map(lambda x : (x.split(' ')[0],x.split(' ')[1]))
>>> pairs.collect()
[('zhang', '1'), ('li', '2')]
Pair RDD 的转换操作
以键值对集合[(1,2),(3,4),(3,6)] 为例
函数名 | 目的 | 实例 | 结果 |
---|---|---|---|
reduceByKey |
合并具有相同键的值 | rdd.reduceByKey(lambda x,y : x + y) | [('3', '46'), ('1', '2')] |
groupByKey | 对具有相同键值进行分组 | rdd.groupByKey() | [(1,[2]),(3,[4,6])] |
combineByKey | 使用不同的返回类型合并具有相同的键的值 | ||
mapValues(func) | 对 pari RDD 的每个值应用一个函数而不改变键 | rdd.mapValues(lambda x : x + 1) | [(1, 3), (3, 5), (3, 7)] |
flatMapValues | 对 Pair RDD 中的每个值应用一个返回迭代器的函数,然后对每个元素都生成一个对应原键的键值对记录。 | rdd.flatMapValues(lambda x : list(range(int(x)))) | [('1', 0), ('1', 1), ('3', 0), ('3', 1), ('3', 2), ('3', 3), ('3', 0), ('3', 1), ('3', 2), ('3', 3), ('3', 4), ('3', 5)] |
keys | 返回一个仅包含键的 RDD | rdd.keys() | ['1', '3', '3'] |
values | 返回一个仅包含值的 RDD | rdd.values() | ['2', '4', '6'] |
sortByKey | 返回一个根据键排序的 RDD | rdd.sortByKey() | [('1', '2'), ('3', '4'), ('3', '6')] |
针对两个 pair RDD 的转化操作
rdd = [(1,2),(3,4),(3,6)]
other = [(3,9)]
函数名 | 目的 | 实例 | 结果 |
---|---|---|---|
subtractByKey |
删除 rdd 中键与 other 相同的键相同的元素 | rdd.subtractByKey(other) | [(1, 2)] |
join | 对两个 RDD 进行内连接 | rdd.join(other) | [(3, (4, 9)), (3, (6, 9))] |
rightOuterJoin | 对两个 RDD 进行右连接 | rdd.rightOuterJoin(other) | [(3, (4, 9)), (3, (6, 9))] |
leftOuterJoin | 对两个 RDD 进行左连接 | rdd.leftOuterJoin(other) | [(1, (2, None)), (3, (4, 9)), (3, (6, 9))] |
rdd.cogroup | 将两个 RDD 相同的键分组到一起 | rdd.cogroup(other) | [1,([2],[]),(3,([4,6],[9]))] |
行动操作
例如:rdd = [(1,2),(3,4),(3,6)]
函数名 | 目的 | 实例 | 结果 |
---|---|---|---|
countByKey | 对每个键进对应的元素分别计数 | rdd.countByKey() | [(1, 1), (3, 2)] |
collectAsMap | 将结果以映射表的形式返回, | rdd.collectAsMap() | {1:2,3:6} |
lookup | 返回给定键的所有值 | rdd.lookup(3) | [4,6] |
其他操作
键值对 RDD 也是 RDD 支持 RDD 所有算子 键值对 RDD 都支持
举例
Pyhton
>>> rdd = sc.parallelize([(1,2),(3,4),(3,6)])
# 筛选第二个元素大于3的元素
>>> res_rdd = rdd.filter(lambda kv: kv[1] > 3)
>>> res_rdd.collect()
[(3, 4), (3, 6)]
上述图标只是简单说明 键值对 RDD 的一些操作 下面通过一些实例来进一步说明其中一些操作。有些操作比较简单易懂,所以举例不会涵盖所有操作。
求平均值
给定一下数据求平均值,会设计 mapValues 和 reduceByKey
key | value |
---|---|
a | 1 |
b | 0 |
b | 3 |
c | 2 |
d | 6 |
d | 9 |
d | 7 |
通过 mapValues 转换为如下结果
key | value |
---|---|
a | (1,1) |
b | (0,1) |
b | (3,1) |
c | (2,1) |
d | (6,1) |
d | (9,1) |
d | (7,1) |
value 为元组 元组第一个值位 key 对应的值 元组第二个值位 次数。要求平均值,需要就出所有值的和,和次数之和
通过 reduceByKey 求出 值之和,和次数之和
如下
key | value |
---|---|
a | (1,1) |
b | (3,2) |
c | (2,1) |
d | (22,3) |
有了次数之和,和 value 之和 求平均值 value 之和除以次数
完整代码如下
Python
>>> rdd = sc.parallelize([('a',1),('b',0),('b',3),('c',2),('d',6),('d',7),('d',9)])
>>> map_rdd = rdd.mapValues(lambda x : (x,1))
>>> result = map_add.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])
>>> result.collect()
[('b', (3, 2)), ('c', (2, 1)), ('a', (1, 1)), ('d', (22, 3))]
# 计算平均值
>>> result = result.mapValues(lambda x : x[0] / x[1])
[('b', 1.5), ('c', 2.0), ('a', 1.0), ('d', 7.333333333333333)]
combineByKey 算子说明
combineByKey 算子的参数较多 在表格中没有举例用法
combineByKey 有三个参数
createCombiner: 初始化方法,如果是新元素 combineByKey 都会调用改方法来初始化,注意:这一过程会在每个分区中第一次出现各个键时发生。而不是在整个 RDD 中第一次出现一个键时发生
mergeValue: 分区内合并方法
mergeCombiners:分区间合并方法
利用 combineByKey 求平均值
Python
rdd = sc.parallelize([('a',1),('b',0),('b',3),('c',2),('d',6),('d',7),('d',9)])
>>> res_rdd = rdd.combineByKey((lambda x:(x,1)),(lambda x,y:(x[0]+y,x[1]+1)),(lambda x,y:(x[0]+y[0],x[1]+y[1])))
>>> res_rdd.collect()
[('b', (3, 2)), ('c', (2, 1)), ('a', (1, 1)), ('d', (22, 3))]
# 计算平均值
>>> res_rdd = res_rdd.mapValues(lambda x : x[0] / x[1])
[('b', 1.5), ('c', 2.0), ('a', 1.0), ('d', 7.333333333333333)]
combineByKey 处理数据流程图
原数据为:
key | value |
---|---|
a | 1 |
b | 0 |
b | 3 |
c | 2 |
d | 6 |
d | 9 |
d | 7 |
假设分为3个区
分区一:
a | 1 |
---|---|
b | 0 |
b | 3 |
分区二:
c | 2 |
---|---|
d | 6 |
d | 9 |
分区三:
d | 7 |
---|
分区中合并间 后:
这里调用 (lambda x,y:(x[0]+y,x[1]+1)) 值相加 次数+1
如果遇到新元素会调用 (lambda x:(x,1)), 转换为 值-次数的 kv结构
分区一:
a | (1,1) |
---|---|
b | (3,2) |
分区二:
c | (2,1) |
---|---|
d | (15,2) |
分区三:
d | (7,1) |
---|
分区见合并
调用方法: (lambda x,y:(x[0]+y[0],x[1]+y[1]))
所有值相加,次数相加
a | (1,1) |
---|---|
b | (3,2) |
c | (2,1) |
d | (22,3) |
sortByKey 算子
sortByKey 两个参数
ascending: 是否结果为升序 默认True。True 为升序 False 为降序
keyfunc: 自定义排序规则
例:
Python
sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x)):