数据算法 Hadoop/Spark大数据处理---第九章
2018-07-08
- 基于传统spark来实现
- 基于传统Scala来实现
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: SparkFriendRecommendation <input-path>");
final String friendsInputPath = args[0];
// create the first RDD from input
JavaSparkContext ctx = SparkUtil.createJavaSparkContext("SparkFriendRecommendation");
JavaRDD<String> records = ctx.textFile(friendsInputPath, 1);
// debug0
List<String> debug1 = records.collect();
for (String t : debug1) {
System.out.println("debug1 record="+t);
// flatMapToPair
// <K2,V2> JavaPairRDD<K2,V2> flatMapToPair(PairFlatMapFunction<T,K2,V2> f)
// Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
// PairFlatMapFunction<T, K, V>
// T => Iterable<Tuple2<K, V>>
JavaPairRDD<Long, Tuple2<Long,Long>> pairs =
// T K V
records.flatMapToPair(new PairFlatMapFunction<String, Long, Tuple2<Long,Long>>() {
public Iterator<Tuple2<Long,Tuple2<Long,Long>>> call(String record) {
// record=<person><TAB><friend1><,><friend2><,><friend3><,>...
String[] tokens = record.split("\t");
long person = Long.parseLong(tokens[0]);
String friendsAsString = tokens[1];
String[] friendsTokenized = friendsAsString.split(",");
List<Long> friends = new ArrayList<Long>();
List<Tuple2<Long,Tuple2<Long, Long>>> mapperOutput =
new ArrayList<Tuple2<Long,Tuple2<Long, Long>>>();
for (String friendAsString : friendsTokenized) {
long toUser = Long.parseLong(friendAsString);
Tuple2<Long,Long> directFriend = T2(toUser, -1L);
mapperOutput.add(T2(person, directFriend));
for (int i = 0; i < friends.size(); i++) {
for (int j = i + 1; j < friends.size(); j++) {
// possible friend 1
Tuple2<Long,Long> possibleFriend1 = T2(friends.get(j), person);
mapperOutput.add(T2(friends.get(i), possibleFriend1));
// possible friend 2
Tuple2<Long,Long> possibleFriend2 = T2(friends.get(i), person);
mapperOutput.add(T2(friends.get(j), possibleFriend2));
return mapperOutput.iterator();
*输出的结果为:debug2 key=1 value=(2,-1)
* debug2 key=1 value=(3,-1)……之类的直接好友
* debug2 key=2 value=(3,1)
* debug2 key=3 value=(2,1)……之类的相互好友
List<Tuple2<Long,Tuple2<Long,Long>>> debug2 = pairs.collect();
for (Tuple2<Long,Tuple2<Long,Long>> t2 : debug2) {
System.out.println("debug2 key="+t2._1+"\t value="+t2._2);
JavaPairRDD<Long, Iterable<Tuple2<Long, Long>>> grouped = pairs.groupByKey();
// debug3
List<Tuple2<Long, Iterable<Tuple2<Long, Long>>>> debug3 = grouped.collect();
for (Tuple2<Long, Iterable<Tuple2<Long, Long>>> t2 : debug3) {
System.out.println("debug3 key="+t2._1+"\t value="+t2._2);
// Find intersection of all List<List<Long>>
// mapValues[U](f: (V) => U): JavaPairRDD[K, U]
// Pass each value in the key-value pair RDD through a map function without changing the keys;
// this also retains the original RDD's partitioning.
JavaPairRDD<Long, String> recommendations =
grouped.mapValues(new Function< Iterable<Tuple2<Long, Long>>, // input
String // final output
>() {
public String call(Iterable<Tuple2<Long, Long>> values) {
// mutualFriends.key = the recommended friend
// mutualFriends.value = the list of mutual friends
final Map<Long, List<Long>> mutualFriends = new HashMap<Long, List<Long>>();
for (Tuple2<Long, Long> t2 : values) {
final Long toUser = t2._1;
final Long mutualFriend = t2._2;
final boolean alreadyFriend = (mutualFriend == -1);
if (mutualFriends.containsKey(toUser)) {
if (alreadyFriend) {
mutualFriends.put(toUser, null);
else if (mutualFriends.get(toUser) != null) {
else {
if (alreadyFriend) {
mutualFriends.put(toUser, null);
else {
List<Long> list1 = new ArrayList<Long>(Arrays.asList(mutualFriend));
mutualFriends.put(toUser, list1);
return buildRecommendations(mutualFriends);
// debug4
List<Tuple2<Long,String>> debug4 = recommendations.collect();
for (Tuple2<Long,String> t2 : debug4) {
System.out.println("debug4 key="+t2._1+ "\t value="+t2._2);
// done
static String buildRecommendations(Map<Long, List<Long>> mutualFriends) {
StringBuilder recommendations = new StringBuilder();
for (Map.Entry<Long, List<Long>> entry : mutualFriends.entrySet()) {
if (entry.getValue() == null) {
recommendations.append(" (");
recommendations.append(": ");
return recommendations.toString();
static Tuple2<Long,Long> buildSortedTuple(long a, long b) {
if (a < b) {
return new Tuple2<Long, Long>(a,b);
else {
return new Tuple2<Long, Long>(b,a);
static Tuple2<Long,Long> T2(long a, long b) {
return new Tuple2<Long,Long>(a, b);
static Tuple2<Long,Tuple2<Long,Long>> T2(long a, Tuple2<Long,Long> b) {
return new Tuple2<Long,Tuple2<Long,Long>>(a, b);
def main(args: Array[String]): Unit = {
if (args.size < 2) {
println("Usage: FriendRecommendation <input-path> <output-path>")
val sparkConf = new SparkConf().setAppName("FriendRecommendation")
val sc = new SparkContext(sparkConf)
val input = args(0)
val output = args(1)
val records = sc.textFile(input)
val pairs = records.flatMap(line => {
val tokens = line.split("\\s")
val person = tokens(0).toLong
val friends = tokens(1).split(",").map(_.toLong).toList
val mapperOutput = friends.map(directFriend => (person, (directFriend, -1.toLong)))
* 这一步就进行到了类似:
* debug2 key=1 value=(2,-1)
* debug2 key=1 value=(3,-1)……之类的直接好友
* debug2 key=2 value=(3,1)
* debug2 key=3 value=(2,1)……之类的相互好友
val result = for {
fi <- friends
fj <- friends
possibleFriend1 = (fj, person)
possibleFriend2 = (fi, person)
if (fi != fj)
} yield {
(fi, possibleFriend1) :: (fj, possibleFriend2) :: List()
mapperOutput ::: result.flatten
// note that groupByKey() provides an expensive solution
// [you must have enough memory/RAM to hold all values for
// a given key -- otherwise you might get OOM error], but
// combineByKey() and reduceByKey() will give a better
// scale-out performance
val grouped = pairs.groupByKey()
val result = grouped.mapValues(values => {
val mutualFriends = new collection.mutable.HashMap[Long, List[Long]].empty
values.foreach(t2 => {
val toUser = t2._1
val mutualFriend = t2._2
val alreadyFriend = (mutualFriend == -1)
if (mutualFriends.contains(toUser)) {
if (alreadyFriend) {
mutualFriends.put(toUser, List.empty)
} else if (mutualFriends.get(toUser).isDefined && mutualFriends.get(toUser).get.size > 0 && !mutualFriends.get(toUser).get.contains(mutualFriend)) {
val existingList = mutualFriends.get(toUser).get
mutualFriends.put(toUser, (mutualFriend :: existingList))
} else {
if (alreadyFriend) {
mutualFriends.put(toUser, List.empty)
} else {
mutualFriends.put(toUser, List(mutualFriend))
// formatting and printing it to console for debugging purposes...
result.foreach(f => {
val friends = if (f._2.isEmpty) "" else {
val items = f._2.map(tuple => (tuple._1, "(" + tuple._2.size + ": " + tuple._2.mkString("[", ",", "]") + ")")).map(g => "" + g._1 + " " + g._2)
println(s"${f._1}: ${friends}")
// done