Spark

Spark从入门到精通63:Dataset的typed操作

2020-07-19  本文已影响0人  勇于自信
1.coalesce和repartition操作

它们都是用来重新定义分区的,区别在于:
coalesce,只能用于减少分区数量,而且可以选择不发生shuffle
repartiton,可以增加分区,也可以减少分区,必须会发生shuffle,相当于是进行了一次重分区操作
实践:
数据集:
employee.json:

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "female", "salary": 8000}

department:

{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}

代码:

package com.spark.ds

import org.apache.spark.sql.SparkSession

object TypedOperation {
  case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Long)
  case class Department(id: Long, name: String)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("TypedOperation")
      .master("local")
      .config("spark.sql.warehouse.dir", "D:/spark-warehouse")
      .getOrCreate()
    import spark.implicits._
    val employee = spark.read.json("inputData/employee.json")
    println(employee.getClass.getSimpleName)
    val employeeDS = employee.as[Employee]
    println(employeeDS.getClass.getSimpleName)
    val department = spark.read.json("inputData/department.json")
    val departmentDS = department.as[Department]
 
    // coalesce和repartition操作
    val employeeDSRepartitioned = employeeDS.repartition(7)
    // 看一下它的分区情况
    println(employeeDSRepartitioned.rdd.partitions.size)
    val employeeDSCoalesced = employeeDSRepartitioned.coalesce(3)
    // 看一下它的分区情况
    println(employeeDSCoalesced.rdd.partitions.size)
    employeeDSCoalesced.show()
  }

}

输出结果:

Dataset
Dataset
7
1

+---+-----+------+------+------+
|age|depId|gender|  name|salary|
+---+-----+------+------+------+
| 25|    1|  male|   Leo| 20000|
| 30|    2|female| Marry| 25000|
| 35|    1|  male|  Jack| 15000|
| 42|    3|  male|   Tom| 18000|
| 21|    3|female|Kattie| 21000|
| 30|    2|female|   Jen| 28000|
| 19|    2|female|   Jen|  8000|
+---+-----+------+------+------+
2.distinct和dropDuplicates操作

它们都是用来进行去重的,区别:
distinct,是根据每一条数据,进行完整内容的比对和去重
dropDuplicates,可以根据指定的字段进行去重
实践:

val distinctEmployeeDS = employeeDS.distinct();
    distinctEmployeeDS.show()
    val dropDuplicatesEmployeeDS = employeeDS.dropDuplicates(Seq("name"))  
    dropDuplicatesEmployeeDS.show()  

输出结果:

+---+-----+------+------+------+
|age|depId|gender|  name|salary|
+---+-----+------+------+------+
| 30|    2|female| Marry| 25000|
| 21|    3|female|Kattie| 21000|
| 42|    3|  male|   Tom| 18000|
| 35|    1|  male|  Jack| 15000|
| 30|    2|female|   Jen| 28000|
| 19|    2|female|   Jen|  8000|
| 25|    1|  male|   Leo| 20000|
+---+-----+------+------+------+

+---+-----+------+------+------+
|age|depId|gender|  name|salary|
+---+-----+------+------+------+
| 35|    1|  male|  Jack| 15000|
| 42|    3|  male|   Tom| 18000|
| 30|    2|female|   Jen| 28000|
| 30|    2|female| Marry| 25000|
| 21|    3|female|Kattie| 21000|
| 25|    1|  male|   Leo| 20000|
+---+-----+------+------+------+
3.except、filter和intersect操作

except:获取在当前dataset中有,但是在另外一个dataset中没有的元素
filter:根据我们自己的逻辑,如果返回true,那么就保留该元素,否则就过滤掉该元素
intersect:获取两个数据集的交集
接着以上代码实践:
输入数据employee2.json:

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}

代码:

employeeDS.except(employeeDS2).show()  
employeeDS.filter { employee => employee.age > 30 }.show() 
employeeDS.intersect(employeeDS2).show()  

输出结果:

+---+-----+------+------+------+
|age|depId|gender|  name|salary|
+---+-----+------+------+------+
| 21|    3|female|Kattie| 21000|
| 42|    3|  male|   Tom| 18000|
| 30|    2|female|   Jen| 28000|
| 19|    2|female|   Jen|  8000|
+---+-----+------+------+------+


+---+-----+------+----+------+
|age|depId|gender|name|salary|
+---+-----+------+----+------+
| 35|    1|  male|Jack| 15000|
| 42|    3|  male| Tom| 18000|
+---+-----+------+----+------+


+---+-----+------+-----+------+
|age|depId|gender| name|salary|
+---+-----+------+-----+------+
| 30|    2|female|Marry| 25000|
| 35|    1|  male| Jack| 15000|
| 25|    1|  male|  Leo| 20000|
+---+-----+------+-----+------+
4.map、flatMap和mapPartitions操作
employeeDS.map { employee => (employee.name, employee.salary + 1000) }.show()

    departmentDS.flatMap {
      department => Seq(Department(department.id + 1, department.name + "_1"), Department(department.id + 2, department.name + "_2"))
    }.show()

    employeeDS.mapPartitions { employees => {
      val result = scala.collection.mutable.ArrayBuffer[(String, Long)]()
      while(employees.hasNext) {
        var emp = employees.next()
        result += ((emp.name, emp.salary + 1000))
      }
      result.iterator
    }
    }.show()

输出结果:

+------+-----+
|    _1|   _2|
+------+-----+
|   Leo|21000|
| Marry|26000|
|  Jack|16000|
|   Tom|19000|
|Kattie|22000|
|   Jen|29000|
|   Jen| 9000|
+------+-----+


+---+--------------------+
| id|                name|
+---+--------------------+
|  2|Technical Departm...|
|  3|Technical Departm...|
|  3|Financial Departm...|
|  4|Financial Departm...|
|  4|     HR Department_1|
|  5|     HR Department_2|
+---+--------------------+



+------+-----+
|    _1|   _2|
+------+-----+
|   Leo|21000|
| Marry|26000|
|  Jack|16000|
|   Tom|19000|
|Kattie|22000|
|   Jen|29000|
|   Jen| 9000|
+------+-----+
上一篇下一篇

猜你喜欢

热点阅读