Spark从入门到放弃—Spark SQL

2021-03-02  本文已影响0人  HaloZhang

简介

Spark SQL是Spark中用于结构化数据处理的一个模块。与Spark RDD API不同,Spark SQL相关接口提供了关于数据结构以及计算执行过程的更多信息。Spark SQL在内部会根据这些信息去执行额外的优化操作。Spark SQL将Spark的函数式编程API与SQL查询集成在一起,它支持通过SQL或者Hive语言来查询数据,同时提供了一种叫做DataFrame和Dataset的结构来对结构化数据进行抽象,并且具有相应的DataFrame API和Datasets API来与Spark SQL进行交互。

Spark SQL特点

Spark SQL具有以下几个特点:

DataFrame

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统关系数据库中的二维表格。DataFrame可以通过多种数据源构造得到,比如:结构性数据、Hive表格、外部数据库、或者已有的RDD。DataFrame API支持多种编程语言,包括Scala、Java、Python以及R。DataFrame与RDD的主要区别在于,前者带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对隐藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
DataFrame的特点如下:

DataFrame实践

在Spark Core中,如果想要执行应用程序,需要首先构建上下文环境对象Spark Context,Spark SQL 其实可以理解为对 Spark Core的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。

SparkSession是Spark最新的SQL查询起始点,SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用Spark-shell的时候,Spark框架会自动创建一个名为”spark“的SparkSession对象,我们可以直接使用。如下图:

DataFrame代码实践

从数据源创建DataFrame

spark安装包目录下默认为我们准备了相关数据,以其中的people.json为例,路径为“/spark-x.x.x-bin-hadoop2.7/examples/src/main/resources/people.json”,内容为: people.json内容

使用下面的代码来加载json文件:

scala> val df = spark.read.json("./examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

打印出DataFrame的结构信息,代码如下:

scala> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

可以看到DataFrame每行数据的第一个属性“age”是long类型,第二个属性“name”是string类型。

DataFrame操作

我们可以使用两种风格的语法来对数据进行操作,分别是SQL语句和DSL(domain-specific language)语法,使用DSL语法风格的话,就不必去创建临时视图了,下面分别介绍。

DSL语法

Select算子

比如我们想选择指定的"age"列,可以使用select方法,代码如下:

scala> df.select("age").show()
+----+
| age|
+----+
|null|
|  30|
|  19|
+----+

可以看到只有”age“那一列被输出来了。

Filter算子

使用filter操作来对数据进行筛选,比如我们需要筛选出年龄大于27岁的人,代码如下:

scala> df.filter(df("age") > 27).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

scala> df.filter($"age" > 27).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

scala> df.filter('age > 27).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

上述代码展示了3种写法,结果是相同的。

groupBy算子

使用groupBy算子来根据某一列对数据进行分组,并且查看条数,代码如下:

scala> df.groupBy("name").count.show()
+-------+-----+
|   name|count|
+-------+-----+
|Michael|    1|
|   Andy|    1|
| Justin|    1|
+-------+-----+

上述代码根据属性“name”对数据进行了分组,并统计了每一个分组的个数。

SQL语法

SQL语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助,即SQL是针对视图的,而不是针对DataFrame的。
还是以上面创建的DataFrame为例,为了使用SQL语法来对数据进行查询,我们首先创建一个临时表。代码如下:

scala> df.createOrReplaceTempView("people")

接下来我们就可以在这个视图上进行SQL查询了。首先全选表格再打印出来,代码如下:

scala> val sqldf = spark.sql("SELECT * FROM people")
sqldf: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> sqldf.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

筛选出指定的“name”列:

scala> val sqldf = spark.sql("SELECT name FROM people").show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

对数据进行过滤,选出年龄大于27岁的数据:

scala> val sqldf = spark.sql("SELECT * FROM people WHERE age >= 27").show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

根据name对数据进行分组,然后统计每个组的个数:

scala> val sqldf = spark.sql("SELECT name, COUNT(*) FROM people GROUP BY name").show()
+-------+--------+
|   name|count(1)|
+-------+--------+
|Michael|       1|
|   Andy|       1|
| Justin|       1|
+-------+--------+

注意:

普通临时表是SparkSession范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如global_temp.people。

全局表的创建如下:

df.createGlobalTempView("people1")
scala> spark.sql("SELECT * FROM global_temp.people1").show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> spark.newSession().sql("SELECT * FROM global_temp.people1").show() //创建新的Session来进行SQL查询
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

如果使用第二种方式去访问之前创建的临时表“people”,则会报错,如下:

scala> spark.newSession().sql("SELECT * FROM people").show()
org.apache.spark.sql.AnalysisException: Table or view not found: people; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [people]

错误显示没有找到对应的表。

DataSet

DataSet是一个分布式数据集合,它是 Spark 1.6 版本中新增的一个接口, 它结合了 RDD(强类型,可以使用强大的 lambda 表达式函数) 和 Spark SQL 的优化执行引擎的好处。Dataset 可以从 JVM 对象构造得到,随后可以使用函数式的变换(map,flatMap,filter 等)进行操作。DataSet是具有强类型的数据集合,在构造的时候需要提供对应的类型信息。

DataSet创建

在Scala中创建Dataset,要定义Scala的case类,case类是具有以下特征的类:

要创建DataSet,首先我们需要为数据集定义一个case class,以上文的people.json文件为例,定义case class如下:

scala> case class employee(name: String, salary: Long)
defined class employee

定义好case class之后,它表示数据中的单个记录,我们可以通过样例类的属性来直接获取对应的值。下面是从文件中创建Dataset的代码:

//注意我们默认读出来的数据是DataFrame,通过as方法转换为指定的行类型。
scala>  val employeeDS = spark.read.json("./examples/src/main/resources/employees.json").as[employee]
employeeDS: org.apache.spark.sql.Dataset[employee] = [name: string, salary: bigint]

scala> employeeDS.show()
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

DataSet操作

对数据进行过滤,筛选出薪水大于4000的数据:

scala> employeeDS.filter(line => line.salary > 4000).show
+----+------+
|name|salary|
+----+------+
|Andy|  4500|
+----+------+

对数据进行映射,只取每一行数据的name字段,如下:

scala> employeeDS.map(line => line.name).show
+-------+
|  value|
+-------+
|Michael|
|   Andy|
| Justin|
|  Berta|
+-------+

对数据按照名称进行分组聚合:

scala> val fc = employeeDS.groupBy("name").count()
fc: org.apache.spark.sql.DataFrame = [name: string, count: bigint]

scala> fc.show
+-------+-----+
|   name|count|
+-------+-----+
|Michael|    1|
|   Andy|    1|
|  Berta|    1|
| Justin|    1|
+-------+-----+

根据每行数据对应的key来进行分组:

scala> val ec = employeeDS.groupByKey(x => x.name).count()
ec: org.apache.spark.sql.Dataset[(String, Long)] = [key: string, count(1): bigint]

scala> ec.show
+-------+--------+
|    key|count(1)|
+-------+--------+
|Michael|       1|
|   Andy|       1|
|  Berta|       1|
| Justin|       1|
+-------+--------+

对两个不同的DataSet进行连接操作,整合成一个。以上面形成的DataSet为例,我们构造一个假的DataSet,然后使用join方法来进行连接操作。

// 构造样例类
scala> case class EmployeeMetadata(name: String, number: BigInt)
defined class EmployeeMetadata

scala> val nameList = List("Michael","Andy","Berta","Justin")
nameList: List[String] = List(Michael, Andy, Berta, Justin)

scala> val metaData = spark.range(4).map(x => (nameList(x.toInt), x)).withColumnRenamed("_1", "name").withColumnRenamed("_2", "number").as[EmployeeMetadata]
metaData: org.apache.spark.sql.Dataset[EmployeeMetadata] = [name: string, number: bigint]

scala> metaData.join(fc, metaData.col("name") === fc.col("name")).show
+-------+------+-------+-----+
|   name|number|   name|count|
+-------+------+-------+-----+
|Michael|     0|Michael|    1|
|   Andy|     1|   Andy|    1|
|  Berta|     2|  Berta|    1|
| Justin|     3| Justin|    1|
+-------+------+-------+-----+

DataFrame、RDD、DataSet相互转换

实际开发程序的时候,我们经常需要在RDD、DataFrame、DataSet之间相互操作,此时需要引入:

import spark.implicits._

但是如果我们使用spark-shell的话,默认它已经帮我们引入了,所以就无需再单独添加这行代码了。这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。
以上文的employee.json为例,我们首先创建一个DataFrame,如下:

scala> val df = spark.read.json("./examples/src/main/resources/employees.json")
rdd: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]

scala> df.show()
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

使用rdd方法将其转化为RDD:

scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[544] at rdd at <console>:25

scala> val array = rdd.collect
array: Array[org.apache.spark.sql.Row] = Array([Michael,3000], [Andy,4500], [Justin,3500], [Berta,4000])

scala> array(0)
res157: org.apache.spark.sql.Row = [Michael,3000]

scala> array(0)(0)
res158: Any = Michael

scala> array(0)(1)
res159: Any = 3000

定义样例类,使用as方法将DataFrame转化为DataSet:

scala> case class Employee(name:String, salary:BigInt)
defined class Employee

scala> val ds = df.as[Employee]
ds: org.apache.spark.sql.Dataset[Employee] = [name: string, salary: bigint]

scala> ds.show
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

反之,使用toDF()方法可以将DataSet转换回DataFrame:

scala> val ddf = ds.toDF()
ddf: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]

scala> ddf.show
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

使用rdd方法将DataSet转换为RDD:

scala> val rddd = ds.rdd
rddd: org.apache.spark.rdd.RDD[Employee] = MapPartitionsRDD[557] at rdd at <console>:25

scala> rddd.first()
res165: Employee = Employee(Michael,3000)

我们再来定义一个RDD,并将其转换为DataFrame:

scala> val rdd = sc.makeRDD(List(("halo", 27), ("ice",18)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[561] at makeRDD at <console>:24

scala> rdd.collect
res169: Array[(String, Int)] = Array((halo,27), (ice,18))

使用toDF()方法,将其转换为DataFrame,注意要传入对应的列名,否则默认的列名是以"_"加编号构成,如下:

scala> rdd.toDF().show()
+----+---+
|  _1| _2|
+----+---+
|halo| 27|
| ice| 18|
+----+---+

scala> rdd.toDF("name", "age").show
+----+---+
|name|age|
+----+---+
|halo| 27|
| ice| 18|
+----+---+

SparkSQL能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seq或者Array等复杂的结构。
我们首先定义case类,然后使用toDS方法,将RDD转换为DataSet:

case class User(name:String, age:Int)

进行转换:

scala> val ds = rdd.map(t=>User(t._1, t._2)).toDS()
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> ds.show
+----+---+
|name|age|
+----+---+
|halo| 27|
| ice| 18|
+----+---+
用一张图来说明这三者之间的转换关系: 图自尚硅谷课件

DataFrame、RDD、DataSet三者的关系

在开始这三者的比较之前,首先回顾一下Spark中关于这三种数据类型的定义:

下面从这15个方面来对这三者进行全方位的对比总结:

1. 发布时间

2. 数据表示

3. 数据格式

4. 数据源API

5. 不变性和可操作性

6. 编译期类型安全

7. 优化

8. 序列化

9. 垃圾回收

10. 效率/内存使用

11. 惰性机制

12. 编程语言支持

13. 聚合操作

14. 应用领域

参考

上一篇 下一篇

猜你喜欢

热点阅读