SparkCore TopN练习题

2020-12-07  本文已影响0人  羋学僧

1、需求:

计算每天【点击】,【下单】,【支付】次数排名前十的品类

注意:就是二次排序,当点击次数一样多的时候,比较下单次数,下单次数一样多的时候比较支付次数

2、数据格式

date:
   日期  2020-03-12
user_id:
   用户id user123
session_id:
   会话id XXXXXYYYYY
page_id:
   页面id 1
action:
   访问的时间 1520769809971
city_id:
   该用户所在的城市 1
search_keywords:
   搜索关键字 "麻辣小龙虾|火锅鱼"
【click_category_id】: 
   用户点击品类的id 1
click_product_id: 
   用户点击商品的id 1
【order_category_id】:  
   用户下单的品类id 1^A2  #品类1和品类2
order_product_id:  
   用户下单的商品id 1^A2^A3 #商品1、商品2和商品3
【pay_category_id】: 
    用户支付的品类id 1^A2
pay_product_id:  
    用户支付的商品id 1^A2

3、数据调研

数据格式如下,数据之间用的^A作为分隔符

2020-03-11,user123,XXXXXYYYYY,1,1520769809971,1,"麻辣小龙虾|火锅鱼",1,1,1^A2,1^A2^A3,1^A2,1^A2
2020-03-11,user1234,XX55YYYYY,1,1520769809972,1,"小龙虾|火锅鱼",1,1,1^A2^A3,1^A2^A3,1^A2^A3,1^A2^A3
2020-03-11,user1234,XX55YYYYY,1,1520769809973,1,"小龙虾|火锅鱼",2,1,1^A2^A3,1^A2^A3,1^A3,1^A2^A3
2020-03-11,user1234,XX55YYYYY,1,1520769809974,1,"小龙虾|火锅鱼",2,1,1^A2^A3,1^A2^A3,1^A2^ A3,1^A2^A3
2020-03-11,user1234,XX55YYYYY,1,1520769809975,1,"小龙虾|火锅鱼",4,1,1^A2^A3,1^A2^A3,1^A2^A3^A4,1^A2^A3

数据是模拟数据,里面有些是点击日志,有些是下单日志,有些是支付日志,一条日志要么就是点击日志,要么就是下单日志,要么就是支付日志,如果一条日志是点击日志,那么这条日志里面支付和下单的品类ID是空的,里面只会有点击的品类ID,支付日志和下单日志同理。而且注意点击的时候一下子只能点击一个品类,所以日志里面只会包含一个品类,但是下单的日志和支付的日志里面,可能会包含多个品类。

4、结果

计算结果如下:

category_id=1|click_category_count=2|order_category_count=5|pay_category_count=5
category_id=2|click_category_count=2|order_category_count=5|pay_category_count=4
category_id=4|click_category_count=1|order_category_count=0|pay_category_count=1
category_id=3|click_category_count=0|order_category_count=4|pay_category_count=4

5、代码

object SparkTopN {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(s"${Constants.SPARK_APP_NAME}").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("D:\\Users\\newhopedata\\IdeaProjects\\SparkScalaWork\\src\\main\\scala\\nx\\core\\log.txt")


    /**
      * 第一步:
      *  获取到所有的品类id
      */
    val allCategoryid = getAllCategoryID(rdd).distinct()

    /**
      * 第二步:
      *  分别获取品类的:
      *    点击,下单,支付的次数
      */
    val clickCategoryCount = getClickCategoryCount(rdd)
    val orderCategoryCount = getOrderCategoryCount(rdd)
    val payCategoryCount = getPayCategoryCount(rdd)

    /**
      * 第三步:
      * 第一步和第二步的结果进行leftjoin
      */
    val resultRDD: RDD[(Long, String)] = leftJoinData(allCategoryid,clickCategoryCount,orderCategoryCount,payCategoryCount)
    /**
      * 第四步:
      * 实现二次排序的效果
      */
    getTopN(resultRDD)

    sc.stop()
  }

  /**
    * 获取所有的品类id
    * @param rdd
    * @return
    */
  def getAllCategoryID(rdd: RDD[String]):RDD[(Long,Long)]={
    val ids = new mutable.HashSet[(Long,Long)]
    rdd.flatMap( line =>{
      val fields = line.split(",")
      val click_category_id = fields(7)
      val order_category_id = fields(9)
      val pay_category_id = fields(11)
      if(click_category_id != null && !click_category_id.trim.equals("")){
        ids+=((click_category_id.toLong,click_category_id.toLong))
      }
      if(order_category_id != null && !order_category_id.trim.equals("")){
        val fields = order_category_id.split(s"\\${Constants.SPLIT_FIELDS}")
        for(categoryid <- fields ){
          ids += ((categoryid.toLong,categoryid.toLong))
        }
      }
      if(pay_category_id != null && !pay_category_id.trim.equals("")){
        val fields = pay_category_id.split(s"\\${Constants.SPLIT_FIELDS}")
        for(categoryid <- fields ){
          ids += ((categoryid.toLong,categoryid.toLong))
        }
      }

      ids
    })
  }

  /**
    * 获取 品类的点击的次数
    * @param rdd
    * @return
    */
  def getClickCategoryCount(rdd: RDD[String]):RDD[(Long,Long)]={
    rdd.filter( line =>{
      val fields = line.split(",")
      fields(7) != null && !fields(7).trim.equals("")
    }).map( line =>{
      val click_category_id = line.split(",")(7).toLong
      (click_category_id,1L)
    }).reduceByKey(_+_)
  }

  /**
    * 获取品类的下单次数
    * @param rdd
    * @return
    */
  def getOrderCategoryCount(rdd: RDD[String]):RDD[(Long,Long)]={
    rdd.filter( line =>{
      val fields = line.split(",")(9)
      fields != null && !fields.trim.equals("")
    }).flatMap( line =>{
      line.split(",")(9).split(s"\\${Constants.SPLIT_FIELDS}")
    }).map( order_category_id =>{
      (order_category_id.toLong,1L)
    }).reduceByKey(_+_)
  }

  /**
    * 获取品类的支付次数
    * @param rdd
    * @return
    */
  def getPayCategoryCount(rdd: RDD[String]):RDD[(Long,Long)]={
    rdd.filter( line =>{
      val fields = line.split(",")(11)
      fields != null && ! fields.trim.equals("")
    }).flatMap( line =>{
      line.split(",")(11).split(s"\\${Constants.SPLIT_FIELDS}")
    }).map( pay_category_id =>{
      (pay_category_id.toLong,1L)
    }).reduceByKey(_+_)
  }

  def leftJoinData(
                  allCategoryID2ID:RDD[(Long,Long)],
                  clickCategoryidAndCount:RDD[(Long,Long)],
                  orderCategoryidAndCount:RDD[(Long,Long)],
                  payCategoryidAndCount:RDD[(Long,Long)]
                  ): RDD[(Long,String)] ={
    /**
      * Long:categoryid
      * Long:categoryid
      *   Some
      *   None
      */
    val resultRDD: RDD[(Long, (Long, Option[Long]))] =
                            allCategoryID2ID.leftOuterJoin(clickCategoryidAndCount)

    val result2RDD: RDD[(Long, (String, Option[Long]))] = resultRDD.map(tuple => {
      val category_id = tuple._1.toLong
      val click_category_count = tuple._2._2.getOrElse(0)

      val value = s"${Constants.CATEGORY_ID}=" + category_id + "|" + s"${Constants.CLICK_CATEGORY_COUNT}=" + click_category_count
      (category_id, value)
    }).leftOuterJoin(orderCategoryidAndCount)


    val result3RDD = result2RDD.map(tuple => {
      val category_id = tuple._1.toLong
      var value = tuple._2._1
      val count = tuple._2._2.getOrElse(0)
      //category_id=1|click_category_count=5|order_category_count=4
      value += "|" + s"${Constants.ORDER_CATEGORY_COUNT}=" + count
      (category_id, value)
    }).leftOuterJoin(payCategoryidAndCount)

    result3RDD.map( tuple =>{
      val category_id = tuple._1.toLong
      var value = tuple._2._1
      val count = tuple._2._2.getOrElse(0)
      value += "|" + s"${Constants.PAY_CATEGORY_COUNT}=" + count
      //category_id=1|click_category_count=5|order_category_count=4|pay_category_count=1
      (category_id,value)
    })


  }

  /**
    * 实现二次排序的效果
    * 根据 点击,下单,支付 进行排序
    *
    * @param resultRDD
    */
  def getTopN(resultRDD: RDD[(Long, String)]): Unit ={
      resultRDD.map( tuple =>{
        val category_id = tuple._1
        val value = tuple._2
        //category_id=1|click_category_count=5|order_category_count=4|pay_category_count=1
        val click_count = value.split("\\|")(1).split("=")(1).toLong
        val order_count = value.split("\\|")(2).split("=")(1).toLong
        val pay_count = value.split("\\|")(3).split("=")(1).toLong
        val key = new Sortkey(click_count,order_count,pay_count)
        //这个地方value的位置其实就我们做项目而言,返回来一个category_id就可以了
        //现在返回来value字段,其实就是为了看效果而已!!!
        (key,value)
      }).sortByKey(false)
      .foreach( tuple =>{
        println(tuple._2);

        /***
          *
          * category_id=1|click_category_count=2|order_category_count=5|pay_category_count=5
            category_id=2|click_category_count=2|order_category_count=5|pay_category_count=4
            category_id=4|click_category_count=1|order_category_count=0|pay_category_count=1
            category_id=3|click_category_count=0|order_category_count=4|pay_category_count=4
          *
          *
          */
      })

  }


}

上一篇 下一篇

猜你喜欢

热点阅读