Spark

Spark从入门到精通60:Dataset的untypd操作案例

2020-07-13  本文已影响0人  勇于自信

统计案例:计算部门的平均薪资和年龄
需求:
1、只统计年龄在20岁以上的员工
2、根据部门名称和员工性别为粒度来进行统计
3、统计出每个部门分性别的平均薪资和年龄

数据如下:
department.json:

{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}

employee.json:

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "female", "salary": 8000}

实现代码:

package session

import org.apache.spark.sql.SparkSession

object DepartmentAvgSalaryAndAgeStat {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName(DepartmentAvgSalaryAndAgeStat.getClass.getName)
      .master("local")
      .config("spark.sql.warehouse.dir","d:/")
      .getOrCreate()
    import spark.implicits._
    import org.apache.spark.sql.functions._
    val employee = spark.read.json("data\\depart\\employee.json")
    val department = spark.read.json("data\\depart\\department.json")
    employee.filter("age > 20").join(department, $"depId" === $"id")
      .groupBy(department("name"),employee("gender"))
      .agg(avg(employee("salary")),avg(employee("age")))
      .show()
  }
}

运行结果如下:

+--------------------+------+-----------+--------+
|                name|gender|avg(salary)|avg(age)|
+--------------------+------+-----------+--------+
|       HR Department|female|    21000.0|    21.0|
|Technical Department|  male|    17500.0|    30.0|
|Financial Department|female|    26500.0|    30.0|
|       HR Department|  male|    18000.0|    42.0|
+--------------------+------+-----------+--------+

注意:这里untyped join,两个表的字段的连接条件,需要使用三个等号,这里的dataframe == dataset[Row]
dataframe的类型是Row,所以是untyped类型,弱类型,dataset的类型通常是我们自定义的case class,所以是typed类型,强类型

上一篇下一篇

猜你喜欢

热点阅读