spark RDD
job由stage构成,stage由task构成。
job:一个action就是一个job
job-划分->stage:当遇到宽依赖,则划分一个stage。
stage-划分->task:task对等partition概念。
Spark 的核心是建立在统一的抽象弹性分布式数据集(Resiliennt Distributed Datasets,RDD)之上的.
RDD:不可变、只读的,可被分区的数据集
RDD特性:
- 只读:不能修改,只能通过转换操作生成新的 RDD。
- 分布式:可以分布在多台机器上进行并行处理。
- 弹性:计算过程中内存不够时它会和磁盘进行数据交换。
- 基于内存:可以全部或部分缓存在内存中,在多次计算间重用。
图 1 RDD 分区及分区与工作节点的分布关系
RDD基本操作:transformation + action
transformation:惰性、实际没有执行、直到action操作才真正运行
表 1 RDD转换操作(rdd1={1, 2, 3, 3},rdd2={3,4,5})
- 函数名 作用 示例 结果
- map() 将函数应用于 RDD 的每个元素,返回值是新的 RDD rdd1.map(x=>x+l) {2,3,4,4}
- flatMap() 将函数应用于 RDD 的每个元素,将元素数据进行拆分,变成迭代器,返回值是新的 RDD rdd1.flatMap(x=>x.to(3)) {1,2,3,2,3,3,3}
- filter() 函数会过滤掉不符合条件的元素,返回值是新的 RDD rdd1.filter(x=>x!=1) {2,3,3}
- distinct() 将 RDD 里的元素进行去重操作 rdd1.distinct() (1,2,3)
- union() 生成包含两个 RDD 所有元素的新的 RDD rdd1.union(rdd2) {1,2,3,3,3,4,5}
- intersection() 求出两个 RDD 的共同元素 rdd1.intersection(rdd2) {3}
- subtract() 将原 RDD 里和参数 RDD 里相同的元素去掉 rdd1.subtract(rdd2) {1,2}
- cartesian() 求两个 RDD 的笛卡儿积 rdd1.cartesian(rdd2) {(1,3),(1,4)……(3,5)}
action操作:行动操作接受 RDD,但是返回非 RDD,即输出一个值或者结果
- collect() 返回 RDD 的所有元素 rdd.collect() {1,2,3,3}
- count() RDD 里元素的个数 rdd.count() 4
- countByValue() 各元素在 RDD 中的出现次数 rdd.countByValue() {(1,1),(2,1),(3,2})}
- take(num) 从 RDD 中返回 num 个元素 rdd.take(2) {1,2}
- top(num) 从 RDD 中,按照默认(降序)或者指定的排序返回最前面的 + num 个元素 rdd.top(2) {3,3}
- reduce() 并行整合所有 RDD 数据,如求和操作 rdd.reduce((x,y)=>x+y) 9
val wordCounts = textFile.flatMap(line => line.split(" ")) //flatmap: 将一行文本 split 成多个单词
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
- fold(zero)(func) 和 reduce() 功能一样,但需要提供初始值 rdd.fold(0)((x,y)=>x+y) 9
- foreach(func) 对 RDD 的每个元素都使用特定函数 rdd1.foreach(x=>printIn(x)) 打印每一个元素
- saveAsTextFile(path) 将数据集的元素,以文本的形式保存到文件系统中 rdd1.saveAsTextFile(file://home/test)
- saveAsSequenceFile(path) 将数据集的元素,以顺序文件格式保存到指 定的目录下 saveAsSequenceFile(hdfs://home/test)
- groupByKey() 只是将输入的tuple按照tuple[0]进行分组,将tuple[1]堆成一个sequence,groupByKey本身不能自定义操作函数。 例如:
(python)
data = [('tom',90),('jerry',97),('luck',92),('tom',78),('luck',64),('jerry',50)]
rdd = sc.parallelize(data)
print rdd.groupByKey().map(lambda x: (x[0],list(x[1]))).collect()
# 输出:
[('tom', [90, 78]), ('jerry', [97, 50]), ('luck', [92, 64])]
- reduceByKey(func) : 要把分布在集群各个节点上的数据中的同一个key,对应的values,都给 集中到一个节点的一个executor的一个task中,对集合起来的value执行传入的函数进行 reduce操作,最后变成一个value---这属于spark调优的shuffle优化,这样就避免了shuffle,提高了spark执行的效率.
例如:tupleRDD.reduceByKey((x,y)=>x+y)
// reduceByKey,按照相同的key进行reduce操作
List<String> list = Arrays.asList("key1", "key1", "key2", "key2", "key3");
JavaRDD<String> stringRDD = javaSparkContext.parallelize(list);
//转为key-value形式
JavaPairRDD<String, Integer> pairRDD = stringRDD.mapToPair(k -> new Tuple2<>(k, 1));
List list1 = pairRDD.reduceByKey((x, y) -> x + y).collect();
System.out.println(list1)
RDD依赖类型:血缘关系的依赖分为窄依赖和宽依赖。
窄依赖是指父 RDD 的每个分区 最多 会被1个子 RDD 的分区所使用。
宽依赖是指父 RDD 的每个分区 会被多个子分区所依赖。
map、filter、union 等操作是窄依赖,而 groupByKey、reduceByKey 等操作是宽依赖。
join 操作有两种情况,如果 join 操作中使用的每个 Partition 仅仅和固定个 Partition 进行 join,则该 join 操作是窄依赖,其他情况下的 join 操作是宽依赖。
所以可得出一个结论,窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖,也就是说,对父 RDD 依赖的 Partition 不会随着 RDD 数据规模的改变而改变。
-
窄依赖
1)子 RDD 的每个分区依赖于常数个父分区(即与数据规模无关)。
2)输入输出一对一的算子,且结果 RDD 的分区结构不变,如 map、flatMap。
3)输入输出一对一的算子,但结果 RDD 的分区结构发生了变化,如 union。
4)从输入中选择部分元素的算子,如 filter、distinct、subtract、sample。 -
宽依赖
1)子 RDD 的每个分区依赖于所有父 RDD 分区。
2)对单个 RDD 基于 Key 进行重组和 reduce,如 groupByKey、reduceByKey。
3)对两个 RDD 基于 Key 进行 join 和重组,如 join。
Spark 的这种依赖关系设计,使其具有了天生的容错性,大大加快了 Spark 的执行速度。RDD 通过血缘关系记住了它是如何从其他 RDD 中演变过来的。当这个 RDD 的部分分区数据丢失时,它可以通过血缘关系获取足够的信息来重新运算和恢复丢失的数据分区,从而带来性能的提升。
相对而言,窄依赖的失败恢复更为高效,它只需要根据父 RDD 分区重新计算丢失的分区即可,而不需要重新计算父 RDD 的所有分区。而对于宽依赖来讲,单个结点失效,即使只是 RDD 的一个分区失效,也需要重新计算父 RDD 的所有分区,开销较大。