005 MapReduce 和 RDD的Join
2019-11-30 本文已影响0人
逸章
1. MapReduce
本例子是用到MapReduce的data processing,use case 如下:
- The retail banking transaction records come with account numbers and the
transaction amounts in comma-separated strings. - Pair the transactions to have key/value pairs such as ( AccNo , TranAmount ).
- Find an account level summary of all the transactions to get the account balance.
1. 1 用spark-shell
scala> val acTransList = Array("SB10001,1000", "SB10002,1200", "SB10001,8000", "SB10002,400", "SB10003,300", "SB10001,10000", "SB10004,500", "SB10005,56", "SB10003,30","SB10002,7000", "SB10001,-100", "SB10002,-10")
acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10001,8000, SB10002,400, SB10003,300, SB10001,10000, SB10004,500, SB10005,56, SB10003,30, SB10002,7000, SB10001,-100, SB10002,-10)
scala> val acTransRDD = sc.parallelize(acTransList)
acTransRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val acKeyVal = acTransRDD.map(trans => (trans.split(",")(0), trans.split(",")(1).toDouble))
acKeyVal: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[1] at map at <console>:25
scala> val accSummary = acKeyVal.reduceByKey(_ + _).sortByKey()
accSummary: org.apache.spark.rdd.RDD[(String, Double)] = ShuffledRDD[5] at sortByKey at <console>:25
scala> accSummary.collect()
res0: Array[(String, Double)] = Array((SB10001,18900.0), (SB10002,8590.0), (SB10003,330.0), (SB10004,500.0), (SB10005,56.0))
scala>
1.2 用pyspark实现相同的功能
yay@yay-ThinkPad-T470-W10DG:~/software/spark-2.4.4-bin-hadoop2.7$ ./sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /home/yay/software/spark-2.4.4-bin-hadoop2.7/logs/spark-yay-org.apache.spark.deploy.master.Master-1-yay-ThinkPad-T470-W10DG.out
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /home/yay/software/spark-2.4.4-bin-hadoop2.7/logs/spark-yay-org.apache.spark.deploy.worker.Worker-1-yay-ThinkPad-T470-W10DG.out
yay@yay-ThinkPad-T470-W10DG:~/software/spark-2.4.4-bin-hadoop2.7$ ./bin/pyspark Python 2.7.15+ (default, Oct 7 2019, 17:39:04)
[GCC 7.4.0] on linux2
Type "help", "copyright", "credits" or "license" for more information.
19/11/30 20:10:56 WARN Utils: Your hostname, yay-ThinkPad-T470-W10DG resolves to a loopback address: 127.0.1.1; using 192.168.1.16 instead (on interface wlp4s0)
19/11/30 20:10:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/11/30 20:11:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Python version 2.7.15+ (default, Oct 7 2019 17:39:04)
SparkSession available as 'spark'.
>>> from decimal import Decimal
>>> acTransList = ["SB10001,1000", "SB10002,1200", "SB10001,8000", "SB10002,400", "SB10003,300", "SB10001,10000", "SB10004,500", "SB10005,56", "SB10003,30","SB10002,7000", "SB10001,-100", "SB10002,-10"]
>>> acTransRDD = sc.parallelize(acTransList)
>>> acKeyVal = acTransRDD.map(lambda trans: (trans.split(",")[0],Decimal(trans.split(",")[1])))
>>> accSummary = acKeyVal.reduceByKey(lambda a,b : a+b).sortByKey()
>>> accSummary.collect()
[('SB10001', Decimal('18900')), ('SB10002', Decimal('8590')), ('SB10003', Decimal('330')), ('SB10004', Decimal('500')), ('SB10005', Decimal('56'))]
>>>
2. RDD的Join
两个DataSet经过Join后形成一个新的DataSet
scala> val acMasterList = Array("SB10001,Roger,Federer", "SB10002,Pete,Sampras", "SB10003,Rafael,Nadal", "SB10004,Boris,Becker", "SB10005,Ivan,Lendl")
acMasterList: Array[String] = Array(SB10001,Roger,Federer, SB10002,Pete,Sampras, SB10003,Rafael,Nadal, SB10004,Boris,Becker, SB10005,Ivan,Lendl)
scala> val acBalList = Array("SB10001,50000", "SB10002,12000", "SB10003,3000", "SB10004,8500", "SB10005,5000")
acBalList: Array[String] = Array(SB10001,50000, SB10002,12000, SB10003,3000, SB10004,8500, SB10005,5000)
scala> val acMasterRDD = sc.parallelize(acMasterList)
acMasterRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val acBalRDD = sc.parallelize(acBalList)
acBalRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:26
scala> val acMasterTuples = acMasterRDD.map(master => master.split(",")).map(masterList => (masterList(0), masterList(1) + " " + masterList(2)))
acMasterTuples: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[3] at map at <console>:25
scala> val acBalTuples = acBalRDD.map(trans => trans.split(",")).map(transList => (transList(0), transList(1)))
acBalTuples: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[5] at map at <console>:25
scala> val acJoinTuples = acMasterTuples.join(acBalTuples).sortByKey().map{case (accno, (name, amount)) => (accno, name,amount)}
acJoinTuples: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[12] at map at <console>:27
scala> acJoinTuples.collect()
res0: Array[(String, String, String)] = Array((SB10001,Roger Federer,50000), (SB10002,Pete Sampras,12000), (SB10003,Rafael Nadal,3000), (SB10004,Boris Becker,8500), (SB10005,Ivan Lendl,5000))
scala>
多演示几个spark action:
scala> val acNameAndBalance = acJoinTuples.map{case (accno, name,amount) => (name,amount)}
acNameAndBalance: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[13] at map at <console>:25
scala> val acTuplesByAmount = acBalTuples.map{case (accno, amount) => (amount.toDouble, accno)}.sortByKey(false)
acTuplesByAmount: org.apache.spark.rdd.RDD[(Double, String)] = ShuffledRDD[17] at sortByKey at <console>:25
scala> acTuplesByAmount.first()
res7: (Double, String) = (50000.0,SB10001)
scala> acTuplesByAmount.take(3)
res8: Array[(Double, String)] = Array((50000.0,SB10001), (12000.0,SB10002), (8500.0,SB10004))
scala> acBalTuples.countByKey()
res9: scala.collection.Map[String,Long] = Map(SB10001 -> 1, SB10005 -> 1, SB10004 -> 1, SB10002 -> 1, SB10003 -> 1)
scala> acBalTuples.count()
res10: Long = 5
scala> acNameAndBalance.foreach(println)
(Boris Becker,8500)
(Rafael Nadal,3000)
(Roger Federer,50000)
(Pete Sampras,12000)
(Ivan Lendl,5000)
scala> val balanceTotal = sc.accumulator(0.0, "Account Balance Total")
warning: there were two deprecation warnings; re-run with -deprecation for details
balanceTotal: org.apache.spark.Accumulator[Double] = 0.0
scala> acBalTuples.map{case (accno, amount) => amount.toDouble}.foreach(bal => balanceTotal += bal)
scala> balanceTotal.value
res13: Double = 78500.0
scala>