spark经典案例之统计每天新增用户数
2019-04-25 本文已影响0人
printf200
前言
本文源自一位群友的一道美团面试题,解题思路(基于倒排索引)。
1、原始数据
2017-01-01 a
2017-01-01 b
2017-01-01 c
2017-01-02 a
2017-01-02 b
2017-01-02 d
2017-01-03 b
2017-01-03 e
2017-01-03 f
根据数据可以看出我们要求的结果为:
2017-01-01 新增三个用户(a,b,c)
2017-01-02 新增一个用户(d)
2017-01-03 新增两个用户(e,f)
2、解题思路
2.1 对原始数据进行倒排索引
结果如下:
用户名 | 列一 | 列二 | 列三 |
---|---|---|---|
a | 2017-01-01 | 2017-01-02 | |
b | 2017-01-01 | 2017-01-02 | 2017-01-03 |
c | 2017-01-01 | ||
d | 2017-01-02 | ||
e | 2017-01-03 | ||
f | 2017-01-03 |
2.2 统计列一中每个日期出现的次数
这样我们只看列一,统计每个日期在列一出现的次数,即为对应日期新增用户数。
3、代码
package com.dkl.leanring.spark.test
import org.apache.spark.sql.SparkSession
object NewUVDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("NewUVDemo").master("local").getOrCreate()
val rdd1 = spark.sparkContext.parallelize(
Array(
("2017-01-01", "a"), ("2017-01-01", "b"), ("2017-01-01", "c"),
("2017-01-02", "a"), ("2017-01-02", "b"), ("2017-01-02", "d"),
("2017-01-03", "b"), ("2017-01-03", "e"), ("2017-01-03", "f")))
//倒排
val rdd2 = rdd1.map(kv => (kv._2, kv._1))
//倒排后的key分组
val rdd3 = rdd2.groupByKey()
//取最小时间
val rdd4 = rdd3.map(kv => (kv._2.min, 1))
rdd4.countByKey().foreach(println)
}
}
结果:
(2017-01-03,2)
(2017-01-02,1)
(2017-01-01,3)
方法二
object AddCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("filedistinct").setMaster("local")
val sc = new SparkContext(sparkConf)
val rdd = sc.textFile("demo6/*")
rdd.filter(x => x.length>0).map(x => (x.split("\\s+")(1),x.split("\\s+")(0))).groupByKey().map(x => {
(x._2.min,1)
}).reduceByKey(_+_).sortByKey().foreach(println)
}
}