Spark-SQL之DataSet操作实战
2019-04-20 本文已影响0人
weare_b646
数据集
MovieLens 1M Dataset
http://files.grouplens.org/datasets/movielens/ml-1m.zip
users.dat
UserID::Gender::Age::Occupation::Zip-code
data:image/s3,"s3://crabby-images/f05c5/f05c5bf06605fc61d865d0e3bf3db2b3c35e577d" alt=""
movies.dat
MovieID::Title::Genres
data:image/s3,"s3://crabby-images/fd7e1/fd7e19d357a362f442260b296aeaabe64c9f220a" alt=""
ratings.dat
UserID::MovieID::Rating::Timestamp
data:image/s3,"s3://crabby-images/9fcea/9fcea0c77d5e9d4d33a3fabbcc90c5c6e5ee50c0" alt=""
maven
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
</dependency>
数据导入
case class user(uid:String, gender:String, age:Int)
case class rating(uid:String, mid:String, rat:Double)
case class movie(mid:String, title:String)
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.master("local[4]")
.appName("test")
.config("spark.sql.shuffle.partitions", "5")
.getOrCreate()
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val root = "D:/ml-1m/"
val userRdd=sc.textFile(root+"users.dat").map(_.split("::"))
val userdf=userRdd.map(x=>user(x(0),x(1),x(2).toInt)).toDF
val movieRdd=sc.textFile(root+"movies.dat").map(_.split("::"))
val moviedf=movieRdd.map(x=>movie(x(0),x(1))).toDF
val ratingRdd=sc.textFile(root+"ratings.dat").map(_.split("::"))
val ratingdf=ratingRdd.map(x=>rating(x(0),x(1),x(2).toDouble)).toDF
}
年龄段在“18-24”的男性年轻人,最喜欢看哪10部电影
val youngmale=userdf.filter("18<=age and gender='M'")
val youngratting=ratingdf.select("uid","mid")
val youngmovies=youngmale.join(youngratting, "uid").groupBy("mid").count.sort(-$"count").limit(10)
youngmovies.join(moviedf,"mid").select("title").show(false)
data:image/s3,"s3://crabby-images/83e3a/83e3a82f4eb306e8b44d1b436799ab027fe8148e" alt=""
得分最高的10部电影
val top=ratingdf.groupBy("mid").agg("uid"->"count","rat"->"sum").withColumnRenamed("count(uid)","count").withColumnRenamed("sum(rat)","sum")//不改名则在SQL中不能用名来读到新增的这两列,不知为何
top.createOrReplaceTempView("top")
val top10=sqlContext.sql("select mid,(sum/count) as avgscore from top").sort(-$"avgscore").limit(10)
top10.join(moviedf,"mid").select("title").show(false)
data:image/s3,"s3://crabby-images/85d00/85d0043e154ac392456955f68d919186a034e800" alt=""
看过电影最多的前10个人
ratingdf.groupBy("uid").count().sort(-$"count").show(10)
data:image/s3,"s3://crabby-images/1fe24/1fe2443bc240a6962bd862f82495a89476eeb6c1" alt=""
男性看过最多的10部电影
val male=userdf.filter("gender='M'").select("uid")
val allmovies=ratingdf.select("uid","mid")
val maleid=male.join(allmovies,"uid").groupBy("mid").count.sort(-$"count").limit(10)//.repartition($"count")没用
maleid.join(moviedf,"mid").select("title").show(false)
data:image/s3,"s3://crabby-images/fea6f/fea6f547086763fd78f16b57c158371c219a397c" alt=""
女性看多最多的10部电影
val female =userdf.filter("gender='F'").select("uid")
val femaleid=female.join(allmovies,"uid").groupBy("mid").count.sort(-$"count").limit(10)
femaleid.join(moviedf,"mid").select("title").show(false)
data:image/s3,"s3://crabby-images/4ead9/4ead9c8d1666ca8d046ffe582f02ea7babd6aea0" alt=""