spark经典案例之统计每天新增用户数

2019-04-25  本文已影响0人  printf200

前言

本文源自一位群友的一道美团面试题,解题思路(基于倒排索引)。

1、原始数据

2017-01-01  a
2017-01-01  b
2017-01-01  c
2017-01-02  a
2017-01-02  b
2017-01-02  d
2017-01-03  b
2017-01-03  e
2017-01-03  f

根据数据可以看出我们要求的结果为:
2017-01-01 新增三个用户(a,b,c)
2017-01-02 新增一个用户(d)
2017-01-03 新增两个用户(e,f)

2、解题思路

2.1 对原始数据进行倒排索引

结果如下:

用户名 列一 列二 列三
a 2017-01-01 2017-01-02
b 2017-01-01 2017-01-02 2017-01-03
c 2017-01-01
d 2017-01-02
e 2017-01-03
f 2017-01-03

2.2 统计列一中每个日期出现的次数

这样我们只看列一,统计每个日期在列一出现的次数,即为对应日期新增用户数。

3、代码

package com.dkl.leanring.spark.test

import org.apache.spark.sql.SparkSession

object NewUVDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("NewUVDemo").master("local").getOrCreate()
    val rdd1 = spark.sparkContext.parallelize(
      Array(
        ("2017-01-01", "a"), ("2017-01-01", "b"), ("2017-01-01", "c"),
        ("2017-01-02", "a"), ("2017-01-02", "b"), ("2017-01-02", "d"),
        ("2017-01-03", "b"), ("2017-01-03", "e"), ("2017-01-03", "f")))
    //倒排
    val rdd2 = rdd1.map(kv => (kv._2, kv._1))
    //倒排后的key分组
    val rdd3 = rdd2.groupByKey()
    //取最小时间
    val rdd4 = rdd3.map(kv => (kv._2.min, 1))
    rdd4.countByKey().foreach(println)
  }
}

结果:

(2017-01-03,2)
(2017-01-02,1)
(2017-01-01,3)

方法二

object AddCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("filedistinct").setMaster("local")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.textFile("demo6/*")

    rdd.filter(x => x.length>0).map(x => (x.split("\\s+")(1),x.split("\\s+")(0))).groupByKey().map(x => {
      (x._2.min,1)
    }).reduceByKey(_+_).sortByKey().foreach(println)
  }
}
上一篇下一篇

猜你喜欢

热点阅读