10. 左外连接
date[2019-01-01]
Data Algorithms(chapter4)
数据说明
[hadoop@chen spark-data]$ cat ch4/input/users.tsv
u1 UT
u2 GA
u3 CA
u4 CA
u5 GA
[hadoop@chen spark-data]$ cat ch4/input/transactions.tsv
t1 p3 u1 1 300
t2 p1 u2 1 100
t3 p1 u1 1 100
t4 p2 u2 1 10
t5 p4 u4 1 9
t6 p1 u1 1 100
t7 p4 u1 1 9
t8 p4 u5 2 40
Left Outer Join
package org.dataalgorithms.chap04.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* Demonstrates how to do "left outer join" on two RDD
* without using Spark's inbuilt feature 'leftOuterJoin'.
*
* The main purpose here is to show the comparison with Hadoop
* MapReduce shown earlier in the book and is only for demonstration purpose.
* For your project we suggest to use Spark's built-in feature
* 'leftOuterJoin' or use DataFrame (highly recommended).
*
* @author Gaurav Bhardwaj (gauravbhardwajemail@gmail.com)
* @editor Mahmoud Parsian (mahmoud.parsian@yahoo.com)
* *
*/
object LeftOuterJoin {
def main(args: Array[String]): Unit = {
if (args.size < 3) {
println("Usage: LeftOuterJoin <users-data-path> <transactions-data-path> <output-path>")
sys.exit(1)
}
val sparkConf = new SparkConf().setAppName("LeftOuterJoin")
val sc = new SparkContext(sparkConf)
val usersInputFile = args(0)
val transactionsInputFile = args(1)
val output = args(2)
val usersRaw = sc.textFile(usersInputFile)
val transactionsRaw = sc.textFile(transactionsInputFile)
val users = usersRaw.map(line => {
val tokens = line.split("\t")
(tokens(0), ("L", tokens(1))) // Tagging Locations with L
})
val transactions = transactionsRaw.map(line => {
val tokens = line.split("\t")
(tokens(2), ("P", tokens(1))) // Tagging Products with P
})
println("===========users==========")
users.foreach(println)
println("===========transactions==========")
transactions.foreach(println)
// This operation is expensive and is listed to compare with Hadoop
// MapReduce approach, please compare it with more optimized approach
// shown in SparkLeftOuterJoin.scala or DataFramLeftOuterJoin.scala
val all = users union transactions
println("===========all==========")
all.foreach(println)
val grouped = all.groupByKey()
println("===========group==========")
grouped.foreach(println)
val productLocations = grouped.flatMap {
case (userId, iterable) =>
// span returns two Iterable, one containing Location and other containing Products
val (location, products) = iterable span (_._1 == "L")
val loc = location.headOption.getOrElse(("L", "UNKNOWN"))
products.filter(_._1 == "P").map(p => (p._2, loc._2)).toSet
}
//
val productByLocations = productLocations.groupByKey()
val result = productByLocations.map(t => (t._1, t._2.size)) // Return (product, location count) tuple
result.saveAsTextFile(output) // Saves output to the file.
// done
sc.stop()
}
}
输出:
===========users==========
(u4,(L,CA))
(u1,(L,UT))
(u5,(L,GA))
(u2,(L,GA))
(u3,(L,CA))
===========transactions==========
(u1,(P,p3))
(u2,(P,p1))
(u1,(P,p1))
(u2,(P,p2))
(u4,(P,p4))
(u1,(P,p1))
(u1,(P,p4))
(u5,(P,p4))
===========all==========
(u4,(L,CA))
(u5,(L,GA))
(u1,(L,UT))
(u2,(L,GA))
(u3,(L,CA))
(u1,(P,p3))
(u2,(P,p1))
(u1,(P,p1))
(u2,(P,p2))
(u4,(P,p4))
(u1,(P,p1))
(u1,(P,p4))
(u5,(P,p4))
===========group==========
(u3,CompactBuffer((L,CA)))
(u5,CompactBuffer((L,GA), (P,p4)))
(u2,CompactBuffer((L,GA), (P,p1), (P,p2)))
(u1,CompactBuffer((L,UT), (P,p3), (P,p1), (P,p1), (P,p4)))
(u4,CompactBuffer((L,CA), (P,p4)))
============= RESULT ==============
(p4,3)
(p1,2)
(p2,1)
(p3,1)