spark大数据学习

Spark Sql之DataFrame&DataSet

2020-03-16  本文已影响0人  TZX_0710

Spark Sql是spark中的一个子模块,主要用于操作结构化数据。它具有如下特点:

  1. 能够将SQL查询于Spark程序无缝混合,允许您使用SQL或DataFrame Api 对结构化数据进行查询。
  2. 支持多种开发语言
  3. 支持多种上百种的外部数据源,包括Hive,Avro,Parquest,ORC,JSON和JDBC等
  4. 支持HiveQL语法以及Hive SerDes和UDF,允许您访问现有的Hive库
  5. 支持标准的JDBC和ODBC连接
  6. 支持优化器,列式存储和代码生成特性等
  7. 支持扩展并能保证容错

一、 DataFrame&DataSet

  1. 为了支持结构化数据处理,Spark提供了新的数据结构DataFrame。DataFrame是一个由具名列组成的数据集。由于SparkSQL支持多种语言开发,所以每种语言都定义了DataFrame的抽象
语言 主要抽象
Scala DataSet[T]&DataFrame(Dataset[row])
Java DataSet[T]
Python DataFrame
R DataFrame

1.1. DataFrame对比RDD

DataFrame和RDD最主要的区别在于一个面向的是结构化数据,一个面向的是非结构化数据。
DataFrame内部的有明确Schema结构,即列名、列字段类型都是已知的,者带来的好处是可以减少读取以及更好的执行计划,从而保证查询效率。
DataFrame和Rdd的应用场景:

  1. 函数式编程使用RDD
  2. 数据是非结构化的使用RDD
  3. 数据是结构化的或者半结构化的,出于性能上的比对采用DataFrame
  1. DataSet
    DataSet也是分布式的集合,在Spark1.6引入,它继承了RDD和DataFrame的优点,具备强类型的特点,可以理解为RDD和DataFrame的加强版,但是只能在scala和Java中使用。在Spark2.0之后,为了方便开发者,Spark将DataFrame和Dataset的API融合到一起,提供了结构化的API。用户可以通过一套标准的API就能完成对两者的操作。
    DataFrame被标记weiUntype API,而DataSet被标记为Typed API
  2. 关于类型安全,在实际使用中,如果你用的是Spqrk Sql的查询语句,则直到运行的时候才能发现语法错误,如果使用的是DataFrame和Dataset,则在编译的时候就可以发现错误。DataFrame和Dataset主要区别在于:在DataFrame中,当你嗲用了Api之外的函数,编译器就会报错。但如果使用了一个不存在的字段名字,编译器依然无法发现。而Dataset的Api都是用Lambda函数和JVM类型对象表示的,所有不匹配的类型参数在编译的时候就会被发现
  3. untyped&typed

在上面介绍过DataFrame的Api被标记为Untyped Api,而DataSet Api被标记为Typed Api。DataFrame的untyped是相对于语言或API层面而已。DataSet是Typed,强类型。Dataset的字段名和错误在编译时候会被IDE发现。

二、 DataFrame&DataSet&Rdd总结

  1. Rdd适合非结构化数据处理
  2. DataFrame&Dataset 可以通过统一的Structure Api 进行访问,而Rdd则更适合函数式编程
  3. 相比于DataFrame而言,Dataset是强类型的,有着更为严格的静态类型检查。
  4. Dataset、DataFrame、Sql的底层都依赖了Rdd Api,并对外提供结构化的访问接口。

三、SparkSql的运行原理

  1. 进行DataFrame/DataSet/Sql编程
  2. 如果是有效的代码,Spark会将其转换为一个逻辑计划
  3. Spark将此逻辑计划转换为物理计划,同时进行代码优化
  4. Spark然后在集群上执行这个物理计划。

创建DataFrame&DataSet


import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

object spark_example {

  //定义全局Class 类似Golang的struct  或者Java的JavaBean
  case class Dept(DeptNo:Long,DName:String,Loc:String)

  def main(args: Array[String]): Unit = {
    //1.创建DataFrame
    //Spark的所有功能入口点是SparkSession 我们通过SParkSession.builder()创建
  val spark=SparkSession.builder().appName("spark-sql").master("local[2]").getOrCreate()
    //从json文件当中创建DataFrame
  val df=spark.read.json("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.json")
    //展示数据
  df.show()


    import spark.implicits._
    //2.创建DataSet   2.1由外部数据创建
   //1.引入隐式转换
    //2.创建case class 类似于JavaBean
    //3.由外部数据创建Datasets
   val ds=spark.read.json("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.json").as[Dept]
   ds.show()

   //2.2
   //1.引入隐式转换 2.创建classBean  3.内部创建Datasets
   val datasets=Seq(Dept(41,"测试部门","mc"),Dept(42,"技术部门","js")).toDS()
   datasets.show()

    //RDD创建DataFrame 从TextFile创建一个Rdd  使用rdd的map算子\t分割然后映射到Dept
    //toDs 转换成DataSet toDf转换成DataFrame
  val rddTods=spark.sparkContext.textFile("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.txt")
      .map(x=>x.split("\t")).map(line=>Dept(line(0).trim.toLong,line(1).trim.toString,line(2).trim.toString)).toDS()
   rddTods.show()

    //指定Schema
    //定义每个列的类型
    val fields=Array(StructField("DeptNo",LongType,nullable = true),StructField("DName",StringType,nullable = true),StructField("Loc",StringType,nullable = true))
    //创建schema
   val schema=StructType(fields)
    //读取text文件获取Rdd
   val deptRdd= spark.sparkContext.textFile("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.txt")
    //采用map算子重构rdd算子
  val  schemaRdd=deptRdd.map(x=>x.split("\t")).map(line=>Row(line(0).trim.toLong,line(1).toString,line(2).toString))
    //根据重新构建的rdd  构建DataFrame
  val deptDf= spark.createDataFrame(schemaRdd,schema)
  deptDf.show()

    //DataFrame和Dataset互相转换
   df.as[Dept]
    //dataset转换成DataFrame
   datasets.toDF()
    //引用列
    deptDf.select("DeptNo").show()
    //新增列 withColumn的参数 第一个为参数名称 第二个参数为原有的某一列
    deptDf.withColumn("sal",$"deptNo").show()
    //删除列
    deptDf.drop("deptNo").show()
    //重命名列
    deptDf.withColumnRenamed("DName","deptName").show()
    //使用SparkSql进行查询
    //把Dataset注册为临时视图    createOrReplaceTempViewl临时会话 周期仅在会话期间
    datasets.createOrReplaceTempView("dept")
    spark.sql("SELECT * FROM dept").show()
    //创建全局临时视图 全局临时视图 他被定义在global_temp数据库下面,需要使用限定的名称引用 global_temp
    datasets.createOrReplaceGlobalTempView("deps")
    spark.sql("select * from global_temp.deps").show()
  }
}

控制台运行结果

##创建DataFrame
+------+----------+--------+
|DEPTNO|     DNAME|     LOC|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+

###创建DataSet
+------+----------+--------+
|DEPTNO|     DNAME|     LOC|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+
### 内部创建Dataset
+------+--------+---+
|DeptNo|   DName|Loc|
+------+--------+---+
|    41|测试部门| mc|
|    42|技术部门| js|
+------+--------+---+

### rdd创建DataFrame
+------+----------+--------+
|DeptNo|     DName|     Loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+
### 指定schema创建DataFrame
+------+----------+--------+
|DeptNo|     DName|     Loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+
###指定列获取
+------+
|DeptNo|
+------+
|    10|
|    20|
|    30|
|    40|
+------+
### 新增列
+------+----------+--------+---+
|DeptNo|     DName|     Loc|sal|
+------+----------+--------+---+
|    10|ACCOUNTING|NEW YORK| 10|
|    20|  RESEARCH|  DALLAS| 20|
|    30|     SALES| CHICAGO| 30|
|    40|OPERATIONS|  BOSTON| 40|
+------+----------+--------+---+
#### 删除列
+----------+--------+
|     DName|     Loc|
+----------+--------+
|ACCOUNTING|NEW YORK|
|  RESEARCH|  DALLAS|
|     SALES| CHICAGO|
|OPERATIONS|  BOSTON|
+----------+--------+
###重命名列
+------+----------+--------+
|DeptNo|  deptName|     Loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+
###创建临时视图擦寻
+------+--------+---+
|DeptNo|   DName|Loc|
+------+--------+---+
|    41|测试部门| mc|
|    42|技术部门| js|
+------+--------+---+
###创建全局视图查询
+------+--------+---+
|DeptNo|   DName|Loc|
+------+--------+---+
|    41|测试部门| mc|
|    42|技术部门| js|
+------+--------+---+

在编写示例代码的时候 创建DataSet遇到如下报错:

Error:(350, 43) Unable to find encoder for type stored in a Dataset.  Primitive types >(Int, String, etc) and Product types (case classes) are supported by importing >spark.implicits._  Support for serializing other types will be added in future >releases.

解决方案:
没有引入隐式转换:

import spark.implicits._
上一篇 下一篇

猜你喜欢

热点阅读