005 MapReduce 和 RDD的Join

2019-11-30  本文已影响0人  逸章

1. MapReduce

本例子是用到MapReduce的data processing,use case 如下:

  1. The retail banking transaction records come with account numbers and the
    transaction amounts in comma-separated strings.
  2. Pair the transactions to have key/value pairs such as ( AccNo , TranAmount ).
  3. Find an account level summary of all the transactions to get the account balance.

1. 1 用spark-shell

scala> val acTransList = Array("SB10001,1000", "SB10002,1200", "SB10001,8000", "SB10002,400", "SB10003,300", "SB10001,10000", "SB10004,500", "SB10005,56", "SB10003,30","SB10002,7000", "SB10001,-100", "SB10002,-10")
acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10001,8000, SB10002,400, SB10003,300, SB10001,10000, SB10004,500, SB10005,56, SB10003,30, SB10002,7000, SB10001,-100, SB10002,-10)

scala> val acTransRDD = sc.parallelize(acTransList)
acTransRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> val acKeyVal = acTransRDD.map(trans => (trans.split(",")(0), trans.split(",")(1).toDouble))
acKeyVal: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[1] at map at <console>:25

scala> val accSummary = acKeyVal.reduceByKey(_ + _).sortByKey()
accSummary: org.apache.spark.rdd.RDD[(String, Double)] = ShuffledRDD[5] at sortByKey at <console>:25

scala> accSummary.collect()
res0: Array[(String, Double)] = Array((SB10001,18900.0), (SB10002,8590.0), (SB10003,330.0), (SB10004,500.0), (SB10005,56.0))

scala> 

1.2 用pyspark实现相同的功能

yay@yay-ThinkPad-T470-W10DG:~/software/spark-2.4.4-bin-hadoop2.7$ ./sbin/start-all.sh 
starting org.apache.spark.deploy.master.Master, logging to /home/yay/software/spark-2.4.4-bin-hadoop2.7/logs/spark-yay-org.apache.spark.deploy.master.Master-1-yay-ThinkPad-T470-W10DG.out
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /home/yay/software/spark-2.4.4-bin-hadoop2.7/logs/spark-yay-org.apache.spark.deploy.worker.Worker-1-yay-ThinkPad-T470-W10DG.out
yay@yay-ThinkPad-T470-W10DG:~/software/spark-2.4.4-bin-hadoop2.7$ ./bin/pyspark Python 2.7.15+ (default, Oct  7 2019, 17:39:04) 
[GCC 7.4.0] on linux2
Type "help", "copyright", "credits" or "license" for more information.
19/11/30 20:10:56 WARN Utils: Your hostname, yay-ThinkPad-T470-W10DG resolves to a loopback address: 127.0.1.1; using 192.168.1.16 instead (on interface wlp4s0)
19/11/30 20:10:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/11/30 20:11:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Python version 2.7.15+ (default, Oct  7 2019 17:39:04)
SparkSession available as 'spark'.
>>> from decimal import Decimal
>>> acTransList = ["SB10001,1000", "SB10002,1200", "SB10001,8000", "SB10002,400", "SB10003,300", "SB10001,10000", "SB10004,500", "SB10005,56", "SB10003,30","SB10002,7000", "SB10001,-100", "SB10002,-10"]
>>> acTransRDD = sc.parallelize(acTransList)
>>> acKeyVal = acTransRDD.map(lambda trans: (trans.split(",")[0],Decimal(trans.split(",")[1])))
>>> accSummary = acKeyVal.reduceByKey(lambda a,b : a+b).sortByKey()
>>> accSummary.collect()                                                        
[('SB10001', Decimal('18900')), ('SB10002', Decimal('8590')), ('SB10003', Decimal('330')), ('SB10004', Decimal('500')), ('SB10005', Decimal('56'))]
>>> 

2. RDD的Join

两个DataSet经过Join后形成一个新的DataSet

scala> val acMasterList = Array("SB10001,Roger,Federer", "SB10002,Pete,Sampras", "SB10003,Rafael,Nadal", "SB10004,Boris,Becker", "SB10005,Ivan,Lendl")
acMasterList: Array[String] = Array(SB10001,Roger,Federer, SB10002,Pete,Sampras, SB10003,Rafael,Nadal, SB10004,Boris,Becker, SB10005,Ivan,Lendl)

scala> val acBalList = Array("SB10001,50000", "SB10002,12000", "SB10003,3000", "SB10004,8500", "SB10005,5000")
acBalList: Array[String] = Array(SB10001,50000, SB10002,12000, SB10003,3000, SB10004,8500, SB10005,5000)

scala> val acMasterRDD = sc.parallelize(acMasterList)
acMasterRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> val acBalRDD = sc.parallelize(acBalList)
acBalRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> val acMasterTuples = acMasterRDD.map(master => master.split(",")).map(masterList => (masterList(0), masterList(1) + " " + masterList(2)))
acMasterTuples: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[3] at map at <console>:25

scala> val acBalTuples = acBalRDD.map(trans => trans.split(",")).map(transList => (transList(0), transList(1)))
acBalTuples: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[5] at map at <console>:25

scala> val acJoinTuples = acMasterTuples.join(acBalTuples).sortByKey().map{case (accno, (name, amount)) => (accno, name,amount)}
acJoinTuples: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[12] at map at <console>:27

scala> acJoinTuples.collect()
res0: Array[(String, String, String)] = Array((SB10001,Roger Federer,50000), (SB10002,Pete Sampras,12000), (SB10003,Rafael Nadal,3000), (SB10004,Boris Becker,8500), (SB10005,Ivan Lendl,5000))

scala> 

多演示几个spark action

scala> val acNameAndBalance = acJoinTuples.map{case (accno, name,amount) => (name,amount)}
acNameAndBalance: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[13] at map at <console>:25

scala> val acTuplesByAmount = acBalTuples.map{case (accno, amount) => (amount.toDouble, accno)}.sortByKey(false)
acTuplesByAmount: org.apache.spark.rdd.RDD[(Double, String)] = ShuffledRDD[17] at sortByKey at <console>:25

scala> acTuplesByAmount.first()
res7: (Double, String) = (50000.0,SB10001)

scala> acTuplesByAmount.take(3)
res8: Array[(Double, String)] = Array((50000.0,SB10001), (12000.0,SB10002), (8500.0,SB10004))

scala> acBalTuples.countByKey()
res9: scala.collection.Map[String,Long] = Map(SB10001 -> 1, SB10005 -> 1, SB10004 -> 1, SB10002 -> 1, SB10003 -> 1)

scala> acBalTuples.count()
res10: Long = 5

scala> acNameAndBalance.foreach(println)
(Boris Becker,8500)
(Rafael Nadal,3000)
(Roger Federer,50000)
(Pete Sampras,12000)
(Ivan Lendl,5000)

scala> val balanceTotal = sc.accumulator(0.0, "Account Balance Total")
warning: there were two deprecation warnings; re-run with -deprecation for details
balanceTotal: org.apache.spark.Accumulator[Double] = 0.0

scala> acBalTuples.map{case (accno, amount) => amount.toDouble}.foreach(bal => balanceTotal += bal)

scala> balanceTotal.value
res13: Double = 78500.0

scala> 
上一篇下一篇

猜你喜欢

热点阅读