Spark Sql之DataFrame&DataSet
2020-03-16 本文已影响0人
TZX_0710
Spark Sql是spark中的一个子模块,主要用于操作结构化数据。它具有如下特点:
- 能够将SQL查询于Spark程序无缝混合,允许您使用SQL或DataFrame Api 对结构化数据进行查询。
- 支持多种开发语言
- 支持多种上百种的外部数据源,包括Hive,Avro,Parquest,ORC,JSON和JDBC等
- 支持HiveQL语法以及Hive SerDes和UDF,允许您访问现有的Hive库
- 支持标准的JDBC和ODBC连接
- 支持优化器,列式存储和代码生成特性等
- 支持扩展并能保证容错
一、 DataFrame&DataSet
- 为了支持结构化数据处理,Spark提供了新的数据结构DataFrame。DataFrame是一个由具名列组成的数据集。由于SparkSQL支持多种语言开发,所以每种语言都定义了DataFrame的抽象
语言 | 主要抽象 |
---|---|
Scala | DataSet[T]&DataFrame(Dataset[row]) |
Java | DataSet[T] |
Python | DataFrame |
R | DataFrame |
1.1. DataFrame对比RDD
DataFrame和RDD最主要的区别在于一个面向的是结构化数据,一个面向的是非结构化数据。
DataFrame内部的有明确Schema结构,即列名、列字段类型都是已知的,者带来的好处是可以减少读取以及更好的执行计划,从而保证查询效率。
DataFrame和Rdd的应用场景:
- 函数式编程使用RDD
- 数据是非结构化的使用RDD
- 数据是结构化的或者半结构化的,出于性能上的比对采用DataFrame
- DataSet
DataSet也是分布式的集合,在Spark1.6引入,它继承了RDD和DataFrame的优点,具备强类型的特点,可以理解为RDD和DataFrame的加强版,但是只能在scala和Java中使用。在Spark2.0之后,为了方便开发者,Spark将DataFrame和Dataset的API融合到一起,提供了结构化的API。用户可以通过一套标准的API就能完成对两者的操作。
DataFrame被标记weiUntype API,而DataSet被标记为Typed API
- 关于类型安全,在实际使用中,如果你用的是Spqrk Sql的查询语句,则直到运行的时候才能发现语法错误,如果使用的是DataFrame和Dataset,则在编译的时候就可以发现错误。DataFrame和Dataset主要区别在于:
在DataFrame中,当你嗲用了Api之外的函数,编译器就会报错。但如果使用了一个不存在的字段名字,编译器依然无法发现。而Dataset的Api都是用Lambda函数和JVM类型对象表示的,所有不匹配的类型参数在编译的时候就会被发现
- untyped&typed
在上面介绍过DataFrame的Api被标记为Untyped Api,而DataSet Api被标记为Typed Api。DataFrame的untyped是相对于语言或API层面而已。DataSet是Typed,强类型。Dataset的字段名和错误在编译时候会被IDE发现。
二、 DataFrame&DataSet&Rdd总结
- Rdd适合非结构化数据处理
- DataFrame&Dataset 可以通过统一的Structure Api 进行访问,而Rdd则更适合函数式编程
- 相比于DataFrame而言,Dataset是强类型的,有着更为严格的静态类型检查。
- Dataset、DataFrame、Sql的底层都依赖了Rdd Api,并对外提供结构化的访问接口。
三、SparkSql的运行原理
- 进行DataFrame/DataSet/Sql编程
- 如果是有效的代码,Spark会将其转换为一个逻辑计划
- Spark将此逻辑计划转换为物理计划,同时进行代码优化
- Spark然后在集群上执行这个物理计划。
创建DataFrame&DataSet
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
object spark_example {
//定义全局Class 类似Golang的struct 或者Java的JavaBean
case class Dept(DeptNo:Long,DName:String,Loc:String)
def main(args: Array[String]): Unit = {
//1.创建DataFrame
//Spark的所有功能入口点是SparkSession 我们通过SParkSession.builder()创建
val spark=SparkSession.builder().appName("spark-sql").master("local[2]").getOrCreate()
//从json文件当中创建DataFrame
val df=spark.read.json("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.json")
//展示数据
df.show()
import spark.implicits._
//2.创建DataSet 2.1由外部数据创建
//1.引入隐式转换
//2.创建case class 类似于JavaBean
//3.由外部数据创建Datasets
val ds=spark.read.json("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.json").as[Dept]
ds.show()
//2.2
//1.引入隐式转换 2.创建classBean 3.内部创建Datasets
val datasets=Seq(Dept(41,"测试部门","mc"),Dept(42,"技术部门","js")).toDS()
datasets.show()
//RDD创建DataFrame 从TextFile创建一个Rdd 使用rdd的map算子\t分割然后映射到Dept
//toDs 转换成DataSet toDf转换成DataFrame
val rddTods=spark.sparkContext.textFile("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.txt")
.map(x=>x.split("\t")).map(line=>Dept(line(0).trim.toLong,line(1).trim.toString,line(2).trim.toString)).toDS()
rddTods.show()
//指定Schema
//定义每个列的类型
val fields=Array(StructField("DeptNo",LongType,nullable = true),StructField("DName",StringType,nullable = true),StructField("Loc",StringType,nullable = true))
//创建schema
val schema=StructType(fields)
//读取text文件获取Rdd
val deptRdd= spark.sparkContext.textFile("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.txt")
//采用map算子重构rdd算子
val schemaRdd=deptRdd.map(x=>x.split("\t")).map(line=>Row(line(0).trim.toLong,line(1).toString,line(2).toString))
//根据重新构建的rdd 构建DataFrame
val deptDf= spark.createDataFrame(schemaRdd,schema)
deptDf.show()
//DataFrame和Dataset互相转换
df.as[Dept]
//dataset转换成DataFrame
datasets.toDF()
//引用列
deptDf.select("DeptNo").show()
//新增列 withColumn的参数 第一个为参数名称 第二个参数为原有的某一列
deptDf.withColumn("sal",$"deptNo").show()
//删除列
deptDf.drop("deptNo").show()
//重命名列
deptDf.withColumnRenamed("DName","deptName").show()
//使用SparkSql进行查询
//把Dataset注册为临时视图 createOrReplaceTempViewl临时会话 周期仅在会话期间
datasets.createOrReplaceTempView("dept")
spark.sql("SELECT * FROM dept").show()
//创建全局临时视图 全局临时视图 他被定义在global_temp数据库下面,需要使用限定的名称引用 global_temp
datasets.createOrReplaceGlobalTempView("deps")
spark.sql("select * from global_temp.deps").show()
}
}
控制台运行结果
##创建DataFrame
+------+----------+--------+
|DEPTNO| DNAME| LOC|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+
###创建DataSet
+------+----------+--------+
|DEPTNO| DNAME| LOC|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+
### 内部创建Dataset
+------+--------+---+
|DeptNo| DName|Loc|
+------+--------+---+
| 41|测试部门| mc|
| 42|技术部门| js|
+------+--------+---+
### rdd创建DataFrame
+------+----------+--------+
|DeptNo| DName| Loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+
### 指定schema创建DataFrame
+------+----------+--------+
|DeptNo| DName| Loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+
###指定列获取
+------+
|DeptNo|
+------+
| 10|
| 20|
| 30|
| 40|
+------+
### 新增列
+------+----------+--------+---+
|DeptNo| DName| Loc|sal|
+------+----------+--------+---+
| 10|ACCOUNTING|NEW YORK| 10|
| 20| RESEARCH| DALLAS| 20|
| 30| SALES| CHICAGO| 30|
| 40|OPERATIONS| BOSTON| 40|
+------+----------+--------+---+
#### 删除列
+----------+--------+
| DName| Loc|
+----------+--------+
|ACCOUNTING|NEW YORK|
| RESEARCH| DALLAS|
| SALES| CHICAGO|
|OPERATIONS| BOSTON|
+----------+--------+
###重命名列
+------+----------+--------+
|DeptNo| deptName| Loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+
###创建临时视图擦寻
+------+--------+---+
|DeptNo| DName|Loc|
+------+--------+---+
| 41|测试部门| mc|
| 42|技术部门| js|
+------+--------+---+
###创建全局视图查询
+------+--------+---+
|DeptNo| DName|Loc|
+------+--------+---+
| 41|测试部门| mc|
| 42|技术部门| js|
+------+--------+---+
在编写示例代码的时候 创建DataSet遇到如下报错:
Error:(350, 43) Unable to find encoder for type stored in a Dataset. Primitive types >(Int, String, etc) and Product types (case classes) are supported by importing >spark.implicits._ Support for serializing other types will be added in future >releases.
解决方案:
没有引入隐式转换:import spark.implicits._