大数据,机器学习,人工智能玩转大数据大数据

2019-08-05

2019-08-05  本文已影响2人  c062197eecd2

Spark core Insight

1.深入理解 RDD 的内在逻辑

  1. 能够使用 RDD 的 算子
  2. 理解 RDD 算子的 Shuffle 和 缓存
  3. 理解 RDD 整体的使用流程
  4. 理解 RDD 的调度原理
  5. 理解 Spark 中常见的分布式变量共享方式

1. 深入 RDD

1.1 案例

需求
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)

val result = sc.textFile("dataset/access_log_sample.txt")
  .map(item => (item.split(" ")(0), 1))
  .filter(item => StringUtils.isNotBlank(item._1))
  .reduceByKey((curr, agg) => curr + agg)
  .sortBy(item => item._2, false)
  .take(10)

result.foreach(item => println(item))
1. 假设要针对整个网站的历史数据进行处理,数据量有1T,如何处理?
2. 如何放在集群中运行?
简单来讲,并行计算就是同时使用多个计算资源解决同一个问题, 有四个要点:
3. 如果放在集群中的话,可能要对整个计算任务进行分解, 那么应该如何分解任务呢?
概述:
扩展
移动数据不如移动计算,这是一个基础的优化,如何做到?
5. 在集群中运行,需要很多节点之间配合,出错的概率也更高,出错了怎么办?

假设 RDD1 RDD2 RDD3 在转换的过程中,RDD2 出错了,有两种办法可以解决:

  1. 缓存 RDD2 的数据,直接回复 RDD2 ,类似 HDFS 上的备份机制。
  2. 记录 RDD2 的依赖关系,通过其父类的 RDD 来回复 RDD2 ,这种方式会减少很多的数据交互和保存的磁盘空间。
如何通过父级 RDD 来 恢复?
  1. 记录 RDD2 的父亲是 RDD1
  2. 记录 RDD2 的计算函数, 例如 记录(下面就是一个计算函数):
RDD2 = RDD1.map(…​), map(…​) 
  1. 当 RDD2 计算出错的时候, 可以通过 父级 RDD 和计算函数来恢复 RDD2
6. 假如任务特别的复杂,流程特别的长,有很多的 RDD 之间的依赖关系,如何优化呢?
在 RDD 中有两个手段可以做到
  1. 缓存
  2. CheckPoint

1.2 再谈 RDD

1.2.1 RDD 为什么会出现?

在 RDD 出现之前,当时 MapReduce 是比较主流的,而 MapReduce 如何执行迭代计算的任务呢?
RDD 如何解决迭代计算非常低效的问题的呢?
Job3 = (Job1.map).filter
// 线性回归
val points = sc.textFile(...)
   .map(...)
   .persist(...)
val w = randomValue
for (i <- 1 to 10000) {
   val gradient = points.map(p => p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y)
       .reduce(_ + _)
   w -= gradient
}

1.2.3 RDD 的特点

RDD 不仅是数据集,也是编程模型
RDD 的算子大致分为两类:
RDD 可以分区
RDD 是只读的

RDD 是可以容错的


RDD 的容错有两种方式

1.2.3 什么叫做弹性分布式数据集

分布式
弹性
数据集

总结: RDD 的 五大属性

首先整理一下上面所提到的 RDD 所要实现的功能:
  1. RDD 有分区
  2. RDD 要可以通过依赖关系和计算函数进行容错
  3. RDD 要针对数据本地性进行优化
  4. RDD 支持 MapReduce 形式的计算, 所以要能够对数据进行 Shuffled
对于 RDD 来说, 其中应该有什么内容呢? 如果站在 RDD 设计者的角度上, 这个类中, 至少需要什么属性?

2. RDD 的 算子

1.理解 RDD 的算子分类, 以及其特性
2.理解常见算子的使用

分类

RDD 中的算子从功能上分为两大类

  1. Transformation(转换) 它会在一个已经存在的 RDD 上创建一个新的 RDD, 将旧的 RDD 的数据转换为另外一种形式后放入新的 RDD
  2. Action(动作) 执行各个分区的计算任务, 将的到的结果返回到 Driver 中

RDD 中可以存放各种类型的数据, 那么对于不同类型的数据, RDD 又可以分为三类

特点

2.1 Transformations 算子

map(T ⇒ U)

  .map( num => num * 10 )
  .collect()
map
map
作用
签名
def map[U: ClassTag](f: T ⇒ U): RDD[U]
参数

f → Map 算子是 原RDD → 新RDD 的过程, 传入函数的参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据

注意点

flatMap(T ⇒ List[U])

sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))
  .flatMap( line => line.split(" ") )
  .collect()
flatMap
flatMap
作用
调用
def flatMap[U: ClassTag](f: T ⇒ List[U]): RDD[U]
参数

f → 参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据, 需要注意的是返回值是一个集合, 集合中的数据会被展平后再放入新的 RDD

注意点

filter(T ⇒ Boolean)

sc.parallelize(Seq(1, 2, 3))
  .filter( value => value >= 3 )
  .collect()
filter
filter
作用

mapPartitions(List[T] ⇒ List[U])

mapPartitionsWithIndex

mapValues

sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
  .mapValues( value => value * 10 )
  .collect()
mapvalues
mapvalues
作用

sample(withReplacement, fraction, seed)

sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
  .sample(withReplacement = true, 0.6, 2)
  .collect()
sample
sample
作用
参数

union(other)并集

val rdd1 = sc.parallelize(Seq(1, 2, 3))
val rdd2 = sc.parallelize(Seq(4, 5, 6))
rdd1.union(rdd2)
  .collect()
union
union

intersection(other)

val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))
val rdd2 = sc.parallelize(Seq(4, 5, 6, 7, 8))
rdd1.intersection(rdd2)
  .collect()
image.png
image.png
作用

subtract(other, numPartitions)(差集)

distinct(numPartitions)(去重)

sc.parallelize(Seq(1, 1, 2, 2, 3))
  .distinct()
  .collect()
distinct
distinct
作用
注意点

reduceByKey((V, V) ⇒ V, numPartition)

sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
  .reduceByKey( (curr, agg) => curr + agg )
  .collect()
reduceByKey
reduceByKey
作用

首先按照 Key 分组生成一个 Tuple, 然后针对每个组执行 reduce 算子

调用
def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
参数

func → 执行数据处理的函数, 传入两个参数, 一个是当前值, 一个是局部汇总, 这个函数需要有一个输出, 输出就是这个 Key 的汇总结果

注意点

ReduceByKey 只能作用于 Key-Value 型数据, Key-Value 型数据在当前语境中特指 Tuple2
ReduceByKey 是一个需要 Shuffled 的操作
和其它的 Shuffled 相比, ReduceByKey是高效的, 因为类似 MapReduce 的, 在 Map 端有一个 Cominer, 这样 I/O 的数据便会减少

groupByKey()

sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
  .groupByKey()
  .collect()
groupByKey
groupByKey
作用
注意点

combineByKey()

val rdd = sc.parallelize(Seq(
  ("zhangsan", 99.0),
  ("zhangsan", 96.0),
  ("lisi", 97.0),
  ("lisi", 98.0),
  ("zhangsan", 97.0))
)

val combineRdd = rdd.combineByKey(
  score => (score, 1),
  (scoreCount: (Double, Int),newScore) => (scoreCount._1 + newScore, scoreCount._2 + 1),
  (scoreCount1: (Double, Int), scoreCount2: (Double, Int)) =>
    (scoreCount1._1 + scoreCount2._1, scoreCount1._2 + scoreCount2._2)
)

val meanRdd = combineRdd.map(score => (score._1, score._2._1 / score._2._2))

meanRdd.collect()
combineByKey
作用
调用
combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner],[serializer])
参数
注意点

aggregateByKey()

val rdd = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))
val result = rdd.aggregateByKey(0.8)(
  seqOp = (zero, price) => price * zero,
  combOp = (curr, agg) => curr + agg
).collect()
println(result)
aggregateByKey
作用

聚合所有 Key 相同的 Value, 换句话说, 按照 Key 聚合 Value

调用

rdd.aggregateByKey(zeroValue)(seqOp, combOp)

参数

zeroValue 初始值
seqOp 转换每一个值的函数
comboOp 将转换过的值聚合的函数

注意点 为什么需要两个函数?

aggregateByKey 运行将一个 RDD[(K, V)] 聚合为 RDD[(K, U)], 如果要做到这件事的话, 就需要先对数据做一次转换, 将每条数据从 V 转为 U, seqOp 就是干这件事的 当 seqOp 的事情结束以后, comboOp 把其结果聚合
和 reduceByKey 的区别

foldByKey(zeroValue)((V, V) ⇒ V)

sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
  .foldByKey(zeroValue = 10)( (curr, agg) => curr + agg )
  .collect()
foldByKey
foldByKey
作用

和 ReduceByKey 是一样的, 都是按照 Key 做分组去求聚合, 但是 FoldByKey 的不同点在于可以指定初始值

调用

foldByKey(zeroValue)(func)

参数

zeroValue 初始值
func seqOp 和 combOp 相同, 都是这个参数

注意点

FoldByKey 是 AggregateByKey 的简化版本, seqOp 和 combOp 是同一个函数
FoldByKey 指定的初始值作用于每一个 Value

join(other, numPartitions)

val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)))
val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 11), ("a", 12)))

rdd1.join(rdd2).collect()
Join
作用

将两个 RDD 按照相同的 Key 进行连接

调用
join(other, [partitioner or numPartitions])
参数

other 其它 RDD
partitioner or numPartitions 可选, 可以通过传递分区函数或者分区数量来改变分区

注意点

cogroup(other, numPartitions)

val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("a", 5), ("b", 2), ("b", 6), ("c", 3), ("d", 2)))
val rdd2 = sc.parallelize(Seq(("a", 10), ("b", 1), ("d", 3)))
val rdd3 = sc.parallelize(Seq(("b", 10), ("a", 1)))

val result1 = rdd1.cogroup(rdd2).collect()
val result2 = rdd1.cogroup(rdd2, rdd3).collect()

/*
执行结果:
Array(
  (d,(CompactBuffer(2),CompactBuffer(3))),
  (a,(CompactBuffer(1, 2, 5),CompactBuffer(10))),
  (b,(CompactBuffer(2, 6),CompactBuffer(1))),
  (c,(CompactBuffer(3),CompactBuffer()))
)
 */
println(result1)

/*
执行结果:
Array(
  (d,(CompactBuffer(2),CompactBuffer(3),CompactBuffer())),
  (a,(CompactBuffer(1, 2, 5),CompactBuffer(10),CompactBuffer(1))),
  (b,(CompactBuffer(2, 6),CompactBuffer(1),Co...
 */
println(result2)
cogroup
作用

多个 RDD 协同分组, 将多个 RDD 中 Key 相同的 Value 分组

调用
cogroup(rdd1, rdd2, rdd3, [partitioner or numPartitions])
参数

rdd…​ 最多可以传三个 RDD 进去, 加上调用者, 可以为四个 RDD 协同分组
partitioner or numPartitions 可选, 可以通过传递分区函数或者分区数来改变分区

注意点

对 RDD1, RDD2, RDD3 进行 cogroup, 结果中就一定会有三个 List, 如果没有 Value 则是空 List, 这一点类似于 SQL 的全连接, 返回所有结果, 即使没有关联上
CoGroup 是一个需要 Shuffled 的操作

cartesian(other)

(RDD[T], RDD[U]) ⇒ RDD[(T, U)]

sortBy(ascending, numPartitions)

val rdd1 = sc.parallelize(Seq(("a", 3), ("b", 2), ("c", 1)))
val sortByResult = rdd1.sortBy( item => item._2 ).collect()
val sortByKeyResult = rdd1.sortByKey().collect()

println(sortByResult)
println(sortByKeyResult)
作用

排序相关相关的算子有两个, 一个是 sortBy, 另外一个是 sortByKey

调用
sortBy(func, ascending, numPartitions)
参数

func 通过这个函数返回要排序的字段
ascending 是否升序
numPartitions 分区数

注意点

普通的 RDD 没有 sortByKey, 只有 Key-Value 的 RDD 才有
sortBy 可以指定按照哪个字段来排序, sortByKey 直接按照 Key 来排序

partitionBy(partitioner)

coalesce(numPartitions)

val rdd = sc.parallelize(Seq(("a", 3), ("b", 2), ("c", 1)))
val oldNum = rdd.partitions.length

val coalesceRdd = rdd.coalesce(4, shuffle = true)
val coalesceNum = coalesceRdd.partitions.length

val repartitionRdd = rdd.repartition(4)
val repartitionNum = repartitionRdd.partitions.length

print(oldNum, coalesceNum, repartitionNum)
作用

一般涉及到分区操作的算子常见的有两个, repartitioin 和 coalesce, 两个算子都可以调大或者调小分区数量

调用
repartitioin(numPartitions)
coalesce(numPartitions, shuffle)
参数

numPartitions 新的分区数
shuffle 是否 shuffle, 如果新的分区数量比原分区数大, 必须 Shuffled, 否则重分区无效

注意点

repartition 和 coalesce 的不同就在于 coalesce 可以控制是否 Shuffle
repartition 是一个 Shuffled 操作

repartition(numPartitions)

repartitionAndSortWithinPartitions

常见的 Transformation 类型的 RDD

map
flatMap
filter
groupBy
reduceByKey

常见的 Action 类型的 RDD

collect
countByKey
reduce

上一篇 下一篇

猜你喜欢

热点阅读