Spark

Spark从入门到精通65:Dataset的聚合函数

2020-07-20  本文已影响0人  勇于自信
1.dataset基本聚合函数

dataset基本聚合函数如下:
avg、sum、max、min、count、countDistinct
实践:
输入数据:
employee.json:

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "female", "salary": 8000}

department:

{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}

代码:

package com.spark.ds

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object AggregateFunction {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("AggregateFunction")
      .master("local")
      .config("spark.sql.warehouse.dir", "D:/spark-warehouse")
      .getOrCreate()
    import spark.implicits._
    val employee = spark.read.json("inputData/employee.json")
    val department = spark.read.json("inputData/department.json")
    employee
      .join(department,$"depId" === $"id")
      .groupBy(department("name"))
      .agg(avg(employee("salary")), sum(employee("salary")), max(employee("salary")),
        min(employee("salary")), count(employee("name")), countDistinct(employee("name")))
      .show()

  }
}

输出结果:

+--------------------+------------------+-----------+-----------+-----------+-----------+--------------------+
|                name|       avg(salary)|sum(salary)|max(salary)|min(salary)|count(name)|count(DISTINCT name)|
+--------------------+------------------+-----------+-----------+-----------+-----------+--------------------+
|Technical Department|           17500.0|      35000|      20000|      15000|          2|                   2|
|       HR Department|           19500.0|      39000|      21000|      18000|          2|                   2|
|Financial Department|20333.333333333332|      61000|      28000|       8000|          3|                   2|
+--------------------+------------------+-----------+-----------+-----------+-----------+--------------------+
2.dataset集合聚合函数collect_list和collect_set

collect_list和collect_set,都用于将同一个分组内的指定字段的值串起来,变成一个数组,常用于行转列,例如:
depId=1, employee=leo
depId=1, employee=jack
depId=1, employees=[leo, jack]
在这里,collect_list不会对列表去重,collect_set会对列表进行去重
案例:
在以上代码上继续编写:

employee
      .groupBy(employee("depId"))
      .agg(collect_set(employee("name")),collect_list(employee("name")))
      .collect()
      .foreach(println(_))

输出结果:

[1,WrappedArray(Jack, Leo),WrappedArray(Leo, Jack)]
[3,WrappedArray(Tom, Kattie),WrappedArray(Tom, Kattie)]
[2,WrappedArray(Marry, Jen),WrappedArray(Marry, Jen, Jen)]
上一篇下一篇

猜你喜欢

热点阅读