Spark-SQL之DataSet操作实战

2018-11-05  本文已影响0人  数据萌新

数据集
MovieLens 1M Dataset
http://files.grouplens.org/datasets/movielens/ml-1m.zip

users.dat
UserID::Gender::Age::Occupation::Zip-code


image.png

movies.dat
MovieID::Title::Genres


image.png

ratings.dat
UserID::MovieID::Rating::Timestamp


image.png

maven

 <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.3.1</version>
    </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.1</version>
    </dependency>

数据导入

  case class user(uid:String, gender:String, age:Int)
    case class rating(uid:String, mid:String, rat:Double)
    case class movie(mid:String, title:String)
    def main(args: Array[String]) {

  val spark = SparkSession
          .builder()
          .master("local[4]")
          .appName("test")
          .config("spark.sql.shuffle.partitions", "5")
          .getOrCreate()

        val sc = spark.sparkContext
        val sqlContext = spark.sqlContext
        import sqlContext.implicits._

        val root = "D:/ml-1m/"
        val userRdd=sc.textFile(root+"users.dat").map(_.split("::"))
        val userdf=userRdd.map(x=>user(x(0),x(1),x(2).toInt)).toDF

        val movieRdd=sc.textFile(root+"movies.dat").map(_.split("::"))
        val moviedf=movieRdd.map(x=>movie(x(0),x(1))).toDF

        val ratingRdd=sc.textFile(root+"ratings.dat").map(_.split("::"))
        val ratingdf=ratingRdd.map(x=>rating(x(0),x(1),x(2).toDouble)).toDF
}

年龄段在“18-24”的男性年轻人,最喜欢看哪10部电影

val youngmale=userdf.filter("18<=age and gender='M'")
    val youngratting=ratingdf.select("uid","mid")
    val youngmovies=youngmale.join(youngratting, "uid").groupBy("mid").count.sort(-$"count").limit(10)
    youngmovies.join(moviedf,"mid").select("title").show(false)
image.png

得分最高的10部电影

  val top=ratingdf.groupBy("mid").agg("uid"->"count","rat"->"sum").withColumnRenamed("count(uid)","count").withColumnRenamed("sum(rat)","sum")//不改名则在SQL中不能用名来读到新增的这两列,不知为何
        top.createOrReplaceTempView("top")
        val top10=sqlContext.sql("select mid,(sum/count) as avgscore from top").sort(-$"avgscore").limit(10)
        top10.join(moviedf,"mid").select("title").show(false)
image.png

看过电影最多的前10个人

ratingdf.groupBy("uid").count().sort(-$"count").show(10)
image.png

男性看过最多的10部电影

    val male=userdf.filter("gender='M'").select("uid")
    val allmovies=ratingdf.select("uid","mid")
    val maleid=male.join(allmovies,"uid").groupBy("mid").count.sort(-$"count").limit(10)//.repartition($"count")没用
    maleid.join(moviedf,"mid").select("title").show(false)
image.png

女性看多最多的10部电影

   val female =userdf.filter("gender='F'").select("uid")
        val femaleid=female.join(allmovies,"uid").groupBy("mid").count.sort(-$"count").limit(10)
        femaleid.join(moviedf,"mid").select("title").show(false)
image.png
上一篇下一篇

猜你喜欢

热点阅读