pyspark学习

案例4-优化Page-Rank算法

2018-01-14  本文已影响15人  7125messi

partitionBy()函数优化RDD Joining

pageLinks =  [['a' ,['b','c','d']],
               ['b', ['d','c']],
               ['c', ['b']],
               ['d', ['a','c']]]
pageRanks =  [['a',1],
               ['b',1],
               ['c',1],
               ['d',1]]

numIter = 20

pageRanksRDD  = sc.parallelize(pageRanks, 2).partitionBy(2,hash).persist()
pageLinksRDD  = sc.parallelize(pageLinks, 2).partitionBy(2,hash).persist()
s = 0.85
def rankContribution(uris, rank):
     numberOfUris = len(uris)
     rankContribution = float(rank) / numberOfUris
     newrank =[]
     for uri in uris:
             newrank.append((uri, rankContribution))
     return newrank

for i in range(numIter):
         linksRank = pageLinksRDD.join(pageRanksRDD)
     contributedRDD = linksRank.flatMap(lambda x : rankContribution(x[1][0],x[1][1]))
     sumRanks = contributedRDD.reduceByKey(lambda v1,v2 : v1+v2)
     pageRanksRDD = sumRanks.map(lambda x : (x[0],(1-s)+s*x[1]))
 
pageRanksRDD.collect()

输出结果:
[('b', 1.357243795127982),
('d', 0.8746512999550939),
('a', 0.5217268024809147),
('c', 1.2463781024360086)]

上一篇 下一篇

猜你喜欢

热点阅读