SparkRDD的键值对操作

2017-05-03  本文已影响0人  Spike_3154

pairRDD

Spark 为包含键值对类型的 RDD 提供了一些专有的操作。这些 RDD 被称为pair RDD1。 PairRDD 是很多程序的构成要素, 因为它们提供了并行操作各个键或跨节点重新进行数据分组的操作接口。

创建方法

pairs = lines.map(lambda x: (x.split(" ")[0], x))

pairRDD的转化操作

pairRDD的聚合操作

#这里使用reduceByKey以及MapValues来计算相同的键的值的平均值
#!/usr/bin/env python
# coding=utf-8
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf= conf)
nums = sc.parallelize([(1,2), (1,3), (1,6), (2,4), (2, 7), (2, 9)])
avg = nums.mapValues(lambda x : (x, 1)).reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1]))
result = avg.mapValues(lambda x : float(x[0])/float(x[1]))
print (result.first())
#使用reduceByKey来进行词频统计
#!/usr/bin/env python
# coding=utf-8
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf= conf)
lines = sc.textFile("usePairRDDWordCount.py")
words = lines.flatMap(lambda x:x.split(" "))

wordsMap = words.map(lambda x:(x, 1)).reduceByKey(lambda x, y : x + y)
print(wordsMap.first())
#!/usr/bin/env python
# coding=utf-8
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf= conf)
lines = sc.textFile("./words.map")
#wordsMap = lines.map(lambda x : (x.split(" ")[0], x.split(" ")[1]))
wordsMap = lines.map(lambda x : x.split(" ")).map(lambda x : (x[0], int(x[1])))
sumCount = wordsMap.combineByKey(
    (lambda x:(x, 1)),                             #createCombiner()
    (lambda x, y:(x[0] + y, x[1] + 1)),            #mergeValue()
    (lambda x, y:(x[0] + y[0], x[1] + y[1]))       #mergeContainers()
)
#averg = sumCount.map(lambda xy : (xy[0], xy[1][0]/xy[1][1])).collectAsMap()
#print(averg["coffee"])
averg = sumCount.map(lambda xy : (xy[0], xy[1][0]/xy[1][1]))
print(averg.first())
print(averg.getNumPartitions())
data = [("a", 3), ("b", 4), ("a", 1)]
sc.parallelize(data).reduceByKey(lambda x, y: x + y) # 默认并行度
sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10) # 自定义并行度

数据分组

如果程序中先使用了groupByKey() 然后再使用了 reduce() 或者fold() 的代码,很可
能可以通过使用一种根据键进行聚合的函数来更高效地实现同样的效果。例如, 
rdd.reduceByKey(func)与 rdd.groupByKey().mapValues(value => 
value.reduce(func)) 等价,但是前者更为高效,因为它避免了为每个键创建存放值的列表的步骤。

连接

排序

from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf= conf)
lines = sc.textFile("usePairRDD.py")
pairs = lines.map(lambda x : (x.split(' ')[0], x))
res = pairs.sortByKey(ascending = True, numPartitions=None, keyfunc=lambda 
x:str(x))
#res = pairs.sortByKey(True, None, lambda x : str(x)) #这种或者上面那种形式都可以的
print (res.first())

RDD行动操作

数据分区(难点)

Spark 程序可以通过控制RDD 分区方式来减少通信开销。 分区并不是对所有应用都有好处的——比如,如果给定RDD 只需要被扫描一次, 我们完全没有必要对其预先进行分区处理。只有当数据集多次在诸如连接这种基于键的操作中使用时, 分区才会有帮助。
Spark 中所有的键值对 RDD 都可以进行分区。系统会根据一个针对键的函数对元素进行分组。Spark 没有给出显示控制每个键具体落在哪一个工作节点上的方法(部分原因是Spark 即使在某些节点失败时依然可以工作),但 Spark 可以确保同一组的键出现在同一个节点上。

// 初始化代码;从HDFS商的一个Hadoop SequenceFile中读取用户信息
// userData中的元素会根据它们被读取时的来源,即HDFS块所在的节点来分布
// Spark此时无法获知某个特定的UserID对应的记录位于哪个节点上
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// 周期性调用函数来处理过去五分钟产生的事件日志
// 假设这是一个包含(UserID, LinkInfo)对的SequenceFile
def processNewLogs(logFileName: String) {
  val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
  val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
  val offTopicVisits = joined.filter {
    case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
      !userInfo.topics.contains(linkInfo.topic)
  }.count()
    println("Number of visits to non-subscribed topics: " + offTopicVisits)
}

对上面的代码,每次调用 processNewLogs() 时都会用到 join() 操作,而我们对数据集是如何分区的却一无所知。接操作会将两个数据集中的所有键的哈希值都求出来, 将该哈希值相同的记录通过网络传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作(见图 4-4)。因为 userData 表比每五分钟出现的访问日志表 events 要大得多,所以要浪费时间做很多额外工作:在每次调用时都对 userData 表进行哈希值计算和跨节点数据混洗,虽然这些数据从来都不会变化。其网络通信图如下所示:

更改上面的代码(Scala)

//这里创建了自定义分区
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
  .partitionBy(new HashPartitioner(100)) // 构造100个分区
  .persist()
//python是这样使用的: rdd.partitionBy(100)

在 processNewLogs() 中, eventsRDD 是 本 地 变量,只在该方法中使用了一次, 所以为 events 指定分区方式没有什么用处。由于在构建 userData 时调用了 partitionBy(), Spark 就知道了该 RDD 是根据键的哈希值来分区的,这样在调用 join() 时, Spark 就会利用到这一点。具体来说,当调用 userData.join(events) 时, Spark 只会对 events 进行数据混洗操作,将 events 中特定 UserID 的记录发送到 userData 的对应分区所在的那台机器上(见图 4-5)。这样,需要通过网络传输的数据就大大减少了,程序运行速度可以显著提升了。这种方式的额网络通信图如下所示。注意上面需要将partitionBy的结果持久化,不进行持久化会导致整个 RDD 谱系图重新求值。那样的话, partitionBy() 带来的好处就会被抵消,导致重复对数据进行分区以及跨节点的混洗,和没有指定分区方式时发生的情况十分相似。


从分区中可以获益的操作

cogroup()、groupWith()、 join()、 leftOuterJoin()、 rightOuterJoin()、 groupByKey()、 reduceByKey()、combineByKey() 以及 lookup()。
对于 reduceByKey() 这样只作用于单个 RDD 的操作,运行在未分区的 RDD 上的时候会导致每个键的所有对应值都在每台机器上进行本地计算,只需要把本地最终归约出的结果值从各工作节点传回主节点, 所以原本的网络开销就不算大。
对于 cogroup() 和join() 这样的二元操作,预先进行数据分区会导致其中至少一个 RDD(使用已知分区器的那个 RDD)不发生数据混洗。如果两个 RDD 使用同样的分区方式, 并且它们还缓存在同样的机器上(比如一个 RDD 是通过 mapValues() 从另一个 RDD 中创建出来的,这两个RDD 就会拥有相同的键和分区方式),或者其中一个 RDD 还没有被计算出来,那么跨节点的数据混洗就不会发生了。

影响分区方式的操作

会为生成的结果 RDD 设好分区方式的操作:cogroup()、 groupWith()、 join()、 lef tOuterJoin()、 rightOuterJoin()、 groupByKey()、reduceByKey()、combineByKey()、 partitionBy()、 sort()、 mapValues()(如果父 RDD 有分区方式的话)、flatMapValues()
对于二元操作,输出数据的分区方式取决于父 RDD 的分区方式。默认情况下,结果会采用哈希分区, 分区的数量和操作的并行度一样。不过,如果其中的一个父 RDD 已经设置过分区方式, 那么结果就会采用那种分区方式;如果两个父 RDD 都设置过分区方式,结果 RDD 会采用第一个父 RDD 的分区方式。

PageRank算法

计算步骤:算法会维护两个数据集: 一个由 (pageID, linkList) 的元素组成,包含每个页面的相邻页面的列表;另一个由 (pageID, rank) 元素组成,包含每个页面的当前排序值。计算的步骤如下:

  1. 将每个页面的排序值初始化为 1.0。
  2. 在每次迭代中, 对页面 p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p) 的贡献值。
  3. 将每个页面的排序值设为 0.15 + 0.85 * contributionsReceived
// 假设相邻页面列表以Spark objectFile的形式存储
val links = sc.objectFile[(String, Seq[String])]("links")
.partitionBy(new HashPartitioner(100))
.persist()
// 将每个页面的排序值初始化为1.0;由于使用mapValues,生成的RDD
// 的分区方式会和"links"的一样
var ranks = links.mapValues(v => 1.0)
// 运行10轮PageRank迭代
for(i <- 0 until 10) {
val contributions = links.join(ranks).flatMap {
case (pageId, (links, rank)) =>
links.map(dest => (dest, rank / links.size))
}
//这里的value表示的含义是每个key收到的所有的引用贡献值。
ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}
// 写出最终排名
ranks.saveAsTextFile("ranks")

上面程序的一些注意点:

  1. linksRDD 在每次迭代中都会和 ranks 发生连接操作。links 是一个静态数据集,程序一开始的时候就对它进行了分区操作,这样就不需要把它通过网络进行数据混洗了。 linksRDD 的字节数一般来说也会比 ranks 大很多,毕竟它包含每个页面的相邻页面列表(由页面 ID 组成),而不仅仅是一个 Double 值,因此这一优化相比 PageRank 的原始实现(例如普通的 MapReduce)节约了相当可观的网络通信开销。
  2. 出于同样的原因, 我们调用 links 的 persist() 方法,将它保留在内存中以供每次迭代使用。
  3. 当我们第一次创建 ranks 时,我们使用 mapValues() 而不是 map() 来保留父 RDD(links)的分区方式,这样对它进行的第一次连接操作就会开销很小。
  4. 在循环体中, 我们在 reduceByKey() 后使用 mapValues();因为 reduceByKey() 的结果已经是哈希分区的了, 这样一来,下一次循环中将映射操作的结果再次与 links 进行连接操作时就会更加高效。

自定义分区

假设我们要在一个网页的集合上运行前一节中的 PageRank 算法。在这里,每个页面的 ID(RDD 中的键)是页面的 URL。当我们使用简单的哈希函数进行分区时,拥有相似的 URL 的页面(比如http://www.cnn.com/WORLDhttp://www.cnn.com/US可能会被分到完全不同的节点上。 )然而,我们知道在同一个域名下的网页更有可能相互链接。由于 PageRank 需要在每次迭代中从每个页面向它所有相邻的页面发送一条消息,因此把这些页面分组到同一个分区中会更好。 可以使用自定义的分区器来实现仅根据域名而不是整个 URL 来分区。这种方式的python代码如下所示:

import urlparse
def hash_domain(url):
return hash(urlparse.urlparse(url).netloc)
rdd.partitionBy(20, hash_domain) # 创建20个分区
上一篇 下一篇

猜你喜欢

热点阅读