Spark Core - 高效的使用 RDD join
Spark 作为分布式的计算框架,最为影响其执行效率的地方就是频繁的网络传输。所以一般的,在不存在数据倾斜的情况下,想要提高 Spark job 的执行效率,就尽量减少 job 的 shuffle 过程(减少 job 的 stage),或者退而减小 shuffle 带来的影响,join 操作也不例外。
所以,针对 spark RDD 的 join 操作的使用,提供一下几条建议:
- 尽量减少参与 join 的 RDD 的数据量。
- 尽量避免参与 join 的 RDD 都具有重复的key。
- 尽量避免或者减少 shuffle 过程。
- 条件允许的情况下,使用 map-join 完成 join。
我们来举个例子,现在一共有两个 RDD,一个是元素为 (String, Double) 类型的 userScoresRDD
, 其 key 代表用户 id,其 value 代表用户游戏的历史分数,id 与分数为一对多的关系。另一个为元素为 (String, String) 的 userMobileRDD
, 其 key 代表用户的id,value 代表用户的手机号码。我们现在需要得到每个用户的最高分以及其手机号,使得可以使用短信的方式向每个用户告知其最高的游戏记录(发短信有些浪费了)。
尽量减少参与 join 的 RDD 的数据量
按照套路,先举个反例,如下:
代码 1
def joinGetUserBestScoreWithMobile1(userScoresRDD: RDD[(String, Double)],
userMobileRDD: RDD[(String, String)])
: RDD[(String, (Double, String))] = {
val userScoreAndMobile = userScoresRDD.join(userMobileRDD)
userScoreAndMobile.reduceByKey((x, y) => if (x._1 > y._1) x else y)
}
在上面的例子中,先进行的 join 操作,在用户的每条游戏记录上都添加了一枚手机号,然后在带着手机号的 RDD 上通过 reduceByKey 得到每个用户最高分已经手机号。
这样做明显会影响效率,我们明显可以先算出每个用户的最高分,然后在去得到他的手机号:
代码 2
def joinGetUserBestScoreWithMobile2(userScoresRDD: RDD[(String, Double)],
userMobilesRDD: RDD[(String, String)])
: RDD[(String, (Double, String))] = {
val userBestScore = userScoresRDD.reduceByKey((x, y) => if (x > y) x else y)
userBestScore.join(userMobilesRDD)
}
两种都使用的reduceByKey,但后者会明显减少参与 join 操作的数据量,即减少了shuffle 的时间,又减少了计算的时间,增加效率,降低了数据的冗余。
尽量避免参与 join 的 RDD 都具有重复的key
此条建议是为了避免发生两个RDD full join 而笛卡尔积的情况。
在我们的例子中,假如每个用户都拥有多个手机号,为了避免 full join 而使数据暴增,我们可以在代码2的基础上,先对 userMobilesRDD 使用 combileByKey 进行处理,减少重复的 key。
尽量避免或者减少 shuffle 过程
Join 怎么才能避免或减少 shuffle 操作呢? 我们知道只有父子RDD的依赖关系为宽依赖的时候,才会发生shuffle,所以关键就是控制父子RDD的依赖关系。join 操作有两个父RDD(即被join的RDD),一个子RDD(join后的结果),首先需要了解一下join操作时依赖的判断过程。下面即为过程源代码:
Spark 源代码 org.apache.spark.rdd.CoGroupedRDD
override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_] =>
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](
rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
}
}
}
其中 part
为 join 所使用的分区器,rdds 为参加 join 的RDD。通过代码,我们就可以了解到,当父RDD与 join操作 使用相同的分区器的时候,父子RDD才会建立窄依赖(OneToOneDependency)关系,否则就使用宽依赖关系,并且 shuffle 使用join操作的分区器来进行分区。
所以最差情况下,如下图一,两个父RDD的分区器与 join 使用的分区都不相同(一般是父RDD的分区器都为 None),两个父RDD到子RDD,都会进行shuffle操作:
图一好一点的情况,如下图二,即只有一个父RDD的分区器与 join操作 所使用的相同。这样只会在一个RDD上发生 shuffle。
图二最后就是最完美的情况,两个父RDD的分区器都与join操作使用的分区器相同。如下图三,不会发生任何shuffle操作:
图三所以,我们可以实际情况,至少减少一次不必要的 shuffle 操作。
下一步我们要做的就是指定父RDD与 join 操作的的分区器为相同的。我们知道,许多的宽依赖操作都可以为其指定分区器,以决定其生成的RDD所使用的分区器,比如 reduceByKey。当然 join 操作也例外,所以我们可以在 join 的时候传入指定的分区器,这样来达到我们想要减少 shuffle 的目的。但是,当我们为两个父RDD指定了相同的分区器的时候,就不需要再为 join 操作传入指定的分区器,这是因为join操作会拿到两个父RDD的中分区器中分区数多的那个分区器作为默认分区器。
关于 join 操作获取默认分区器的详细,具体请看源代码(org.apache.spark.Partitioner 的 defaultPartitioner)
实践一下
让我们回到我们的例子,可以发现我们在使用 reduceByKey 生成 userBestScoreRDD 的时候,使用的是 userMobilesRDD 的分区器(或者在 join 时将要被使用的分区器)。
def joinGetUserBestScoreWithMobile4(userScoresRDD: RDD[(String, Double)],
userMobilesRDD: RDD[(String, String)]): RDD[(String, (Double, Option[String]))] = {
// 如果 userMobilesRDD 存在已知的 partitioner,就直接获取
// 没有就构建返回 userMobilesRDD 将要默认使用的 HashPartitioner.
val mobileRDDPartitioner = userMobilesRDD.partitioner match {
case (Some(p)) => p
case (None) => new HashPartitioner(userMobilesRDD.partitions.length)
}
//
val userBestScoreRDD = userScoresRDD.reduceByKey(mobileRDDPartitioner,
(x,y) => if (x > y) x else y)
// 在做 join 的时候。至少省去了一次 shuffle 的所带来的代价。
userBestScoreRDD.join(userMobilesRDD)
}
仔细分析的话,在整个joinGetUserBestScoreWithMobile4
方法里,相比于之前的代码示例,我们至少减少了一次shuffle操作。这取决于userMobilesRDD的分区器情况。如果userMobilesRDD没有分区器(为None),则userMobilesRDD在参与join的时候会进行 shuffle 操作,而userBestScoreRDD则不会发生shuffle操作。则一共的shuffle次数为2(加上 reduceByKey 一次).这也就是我们所说的“好一点的情况”。
如果userMobilesRDD已经有了分区器,则 userMobilesRDD 与 userBestScoreRDD 在join的时候都不需要shuffle,所以仅仅 reduceByKey 进行了一次shuffle.这也就是我们所说的“完美情况”。
两个分区器怎样才叫做相同,具体要看分区器 equals 方法的实现,以HashPartitioner为例,分区数相同,分区器就相同。
条件允许的情况下,使用 map-join 完成 join
Map join 想必都很熟悉,就不在写介绍了。Spark core 没有提供 map-join 的实现,具体的实现方案就是将小的 RDD 持久化到driver中后,广播到大RDD的各个分区中,自己实现 join 操作。较为通用的代码如下:
def manualBroadCastHashJoin[K: ClassTag, V1: ClassTag, V2: ClassTag](
smallRDD: RDD[(K, V1)],
bigRDD: RDD[(K, V2)],
sc: SparkContext): RDD[(K, (V1, V2))] = {
val smallDataLocaled: Map[K, V1] = smallRDD.collectAsMap()
bigRDD.sparkContext.broadcast(smallDataLocaled)
bigRDD.mapPartitions(p => {
p.flatMap {
case (k, v2) =>
smallDataLocaled.get(k) match {
case None => Seq.empty[(K, (V1, V2))]
case Some(v1) => Seq((k, (v1, v2)))
}
}
}, preservesPartitioning = true)
}
End!!