2020-11-27-Spark-6(Spark-Core)

2020-12-05  本文已影响0人  冰菓_

spark练习题
处理数据上的分组和业务需求上的分组

1.案例topN(要点使用模式匹配重新分组)

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//需求:统计出每一个省份广告被点击次数的TOP3
//数据的示例:
//1516609143869 4 6 1 11
//1516609143869 3 6 49 7
//1516609143869 8 3 4 18
//1516609143869 8 8 69 14
//1516609143869 0 6 51 29
//1516609143869 5 3 59 2
//1516609143869 8 4 66 25

//需求:时间戳 省份 用户 城市 广告
object Test7 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("test7").setMaster("local[*]"))
    val rdd = sc.textFile("src/main/resources/top.txt")
    //由于shuffle传输的字节越多性能越差,只需要需求的数据
    val maprdd: RDD[((String, String), Int)] = rdd.map(data => {
      val th = data.split(" ")
      //结构为 ((省份,城市),广告数)
      ((th(1), th(3)), th(4).toInt)
    })
    //使用效率高的reducebykey,求取出(省份,城市)分组内的所有广告数
    val reducerdd: RDD[((String, String), Int)] = maprdd.reduceByKey(_ + _)
    //使用case 模式匹配重新分组
    val mapardd = reducerdd.map(data => data match {
      case ((province, city), add) => (province, (city, add))
    })
    val grouprdd: RDD[(String, Iterable[(String, Int)])] = mapardd.groupByKey()
    //获取前三
    val result: RDD[(String, List[(String, Int)])] = grouprdd.mapValues(data => data.toList.sortBy(_._2)(Ordering.Int.reverse).take(3))
    result.collect.foreach(println)
    sc.stop()
  }
}

2.基础练习题(过滤求和,最值问题,平均值的多解法效率,join)

12 宋江 25 男 chinese 50
12 宋江 25 男 math 60
12 宋江 25 男 english 70
12 吴用 20 男 chinese 50
12 吴用 20 男 math 50
12 吴用 20 男 english 50
12 杨春 19 女 chinese 70
12 杨春 19 女 math 70
12 杨春 19 女 english 70
13 李逵 25 男 chinese 60
13 李逵 25 男 math 60
13 李逵 25 男 english 70
13 林冲 20 男 chinese 50
13 林冲 20 男 math 60
13 林冲 20 男 english 50
13 王英 19 女 chinese 70
13 王英 19 女 math 80
13 王英 19 女 english 70

1.一共有多少个小于20岁的人参加考试?

 val result = rdd.map(data => data.split(" ")).filter(x => x(2).toInt < 20).groupBy(_(1)).count()

2.一共有多少个女生参加考试?

 val result = rdd.map(_.split(" ")).filter(_ (3) == "男").groupBy(_(1)).count()

3.语文科目的平均成绩是多少?

    val sc = new SparkContext(new SparkConf().setAppName("test1").setMaster("local[*]"))
    val rdd = sc.textFile("src/main/resources/sanguo.txt")
    rdd.map(data => {
      val number = data.split(" ")
      ((number(4), number(5).toInt))
    }).filter(_._1 == "chinese").aggregateByKey((0, 0))(
      (u, v) => ((u._1 + v), u._2 + 1),
      (x, x1) => ((x._1 + x1._1), (x._2 + x1._2))
    ).mapValues(data => data._1 / data._2).collect.foreach(println)
    sc.stop()

4.13班数学最高成绩是多少

object Test2 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("test1").setMaster("local[*]"))
    val rdd = sc.textFile("src/main/resources/sanguo.txt")
    val strings: Int = rdd.map(_.split(" ")).filter(_ (0) == "13").filter(_ (4) == "math").map(_ (5).toInt).max()
      //.sortBy(_.toInt,false).take(1)
    println(strings)
    sc.stop()
  }
}

5.总成绩大于150分的12班的女生有几个?

object Test3 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("test1").setMaster("local[*]"))
    val rdd = sc.textFile("src/main/resources/sanguo.txt")
    val filtrdd: RDD[Array[String]] = rdd.map(_.split(" ")).filter(data => data(0).equals("12") && data(3).equals("女"))
    val rerdd: RDD[(String, Int)] = filtrdd.map(data => (data(1), data(5).toInt)).reduceByKey(_ + _)
    rerdd.collect.foreach(println)
    val result: Long = rerdd.filter(_._2 > 150).count()
    println(result)
    sc.stop()
  }
}

6.总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少?

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

//总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少?
object Test4 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("test1").setMaster("local[*]"))
    val data = sc.textFile("src/main/resources/sanguo.txt")
    //首先求取总成绩大于150的姓名和总成绩
    val mapdd = data.map(txt => {
      val strings = txt.split(" ")
      (strings(1), (strings(4),strings(5).toInt))
    })
    //获取总成绩和科目数
    val aggrdd: RDD[(String, (Int, Int))] = mapdd.aggregateByKey((0, 0))(
      (u, v) => ((u._1 + v._2), u._2 + 1),
      (x, x1) => ((x._1 + x1._1), x._2 + x1._2)
    )
    //得到的是总成绩大于150的姓名和平均值
    val re1: RDD[(String, Int)] = aggrdd.filter(_._2._1 > 150).mapValues(data => data._1 / data._2)
    //(王英,73)
    //(宋江,60)
    //(杨春,70)
    //(李逵,63)
    //(林冲,53)

    //且数学大于等于70,且年龄大于等于19岁的学生
    val maprdd: RDD[Array[String]] = data.map(_.split(" "))
    val filtrdd: RDD[Array[String]] = maprdd.filter(data => data(2).toInt >= 19 && data(4).equals("math") && data(5).toInt >= 70)
    val re2: RDD[(String, Int)] = filtrdd.map(data => {
      (data(1), data(5).toInt)
    })
    //join连接
    val result: RDD[(String, Int)] = re1.join(re2).map(data => {
      (data._1, data._2._1)
    })
    result.collect.foreach(println)
    sc.stop()
    //(王英,73)
    //(杨春,70)
  }
}
上一篇 下一篇

猜你喜欢

热点阅读