Spark从入门到精通65:Dataset的聚合函数
2020-07-20 本文已影响0人
勇于自信
1.dataset基本聚合函数
dataset基本聚合函数如下:
avg、sum、max、min、count、countDistinct
实践:
输入数据:
employee.json:
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "female", "salary": 8000}
department:
{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}
代码:
package com.spark.ds
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object AggregateFunction {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("AggregateFunction")
.master("local")
.config("spark.sql.warehouse.dir", "D:/spark-warehouse")
.getOrCreate()
import spark.implicits._
val employee = spark.read.json("inputData/employee.json")
val department = spark.read.json("inputData/department.json")
employee
.join(department,$"depId" === $"id")
.groupBy(department("name"))
.agg(avg(employee("salary")), sum(employee("salary")), max(employee("salary")),
min(employee("salary")), count(employee("name")), countDistinct(employee("name")))
.show()
}
}
输出结果:
+--------------------+------------------+-----------+-----------+-----------+-----------+--------------------+
| name| avg(salary)|sum(salary)|max(salary)|min(salary)|count(name)|count(DISTINCT name)|
+--------------------+------------------+-----------+-----------+-----------+-----------+--------------------+
|Technical Department| 17500.0| 35000| 20000| 15000| 2| 2|
| HR Department| 19500.0| 39000| 21000| 18000| 2| 2|
|Financial Department|20333.333333333332| 61000| 28000| 8000| 3| 2|
+--------------------+------------------+-----------+-----------+-----------+-----------+--------------------+
2.dataset集合聚合函数collect_list和collect_set
collect_list和collect_set,都用于将同一个分组内的指定字段的值串起来,变成一个数组,常用于行转列,例如:
depId=1, employee=leo
depId=1, employee=jack
depId=1, employees=[leo, jack]
在这里,collect_list不会对列表去重,collect_set会对列表进行去重
案例:
在以上代码上继续编写:
employee
.groupBy(employee("depId"))
.agg(collect_set(employee("name")),collect_list(employee("name")))
.collect()
.foreach(println(_))
输出结果:
[1,WrappedArray(Jack, Leo),WrappedArray(Leo, Jack)]
[3,WrappedArray(Tom, Kattie),WrappedArray(Tom, Kattie)]
[2,WrappedArray(Marry, Jen),WrappedArray(Marry, Jen, Jen)]