4 Spark SQL
DataFrame是一种不可变的分布式数据集,类似于Python pandas DataFrame,允许用户轻松地使用结构化的数据(如数据表)。这样我们就可以Spark SQL来查询结构化的数据或使用Spark表达式方法(注意,这里不使用lambda表达式)。
以前的RDD,由于JAVA的JVM和Python的Py4J之间的通信开销,使得Python相对于Scala和JAVA执行查询会明显变慢。而现在Spark使用Catalyst Optimizer能够显著提高Spark的查询性能。
1.Python和RDD之间的通信
image每次使用RDD 执行PySpark程序时,潜在地需要巨大的开销来执行作业,在PySpark驱动器中,Spark Context通过Py4J启动一个使用JavaSparkContext的JVM。所有的RDD转换最初都映射到Java中的PythonRDD对象。一旦这些任务被推送到Spark的工作节点,PythonRDD对象就使用管道pipe启动Python的子进程中,发送代码和数据到Python中进行处理。
虽然该方法允许PySpark将数据处理分布到多个工作节点的多个Python子进程中,但是如你所见,Python和JVM之间还是有很多上下文切换和通信开销的。
所以处理大数据量时不建议在PySpark中使用RDD。
2.Catalyst Optimizer(Catalyst优化器)
Spark SQL引擎如此之快的原因在于Catalyst Optimizer。
imageCatalyst Optimizer编译优化了逻辑计划,而且还有一个能够确保生成最有效的物理计划的成本优化器。
3.利用DataFrame加速PySpark
DataFrame和Catalyst优化器配合使用(相较于RDD)可以大大提高PySpark的查询性能。
引入DataFrame前,Python查询速度普遍比使用RDD 的Scala查询慢,主要源于Python和JVM之间的通信开销。
image注意:虽然DataFrame可以提升PySpark性能,但是有一些例外,如Python UDF(User-Defined-Function)的使用。
4.Spark SQL的特点
Spark SQL提供了比Spark Core更为高层的用于处理结构化数据的抽象。结构化数据包括存储在RDBMS数据库中的数据和NoSQL数据库中的数据,以及诸如Parquet JSON CSV等数据。
Spark SQL不仅为Spark提供了SQL接口,它还有更为宽泛的设计目的:让Spark更加易用,提升开发者的生产力,让Spark应用运行得更快。
Spark SQL支持多种查询语言,包括SQL、HiveSQL等,还可以用于交互分析,无论哪种场景,内部都会调用Spark Core API在Spark集群上执行查询操作。
(1)和其他Spark库集成
Spark SQL可以用于交互分析和对历史数据进行批处理;
Spark SQL可以和Spark Streaming一起进行实时数据流处理;
Spark SQL可以和Spark ML一起在机器学习应用中使用,例如Spark SQL可以用于特征工程。
(2)可用性
相较于Spark Core API,Spark SQL API为多种功能提供了函数,其中包括:选择列、过滤行、聚合列、合并数据集以及其他数据处理、分析中的常见任务。
相较于Spark Core API,Spark SQL API只需要更少的代码就能处理结构化数据。
(3)数据源
Spark SQL支持多种数据源,可以处理文件、RDBMS数据库、NoSQL数据库。文件可以位于本地或者HDFS 、S3。Spark SQL支持的文件格式包括CSV、JSON、Parquet等。
Spark SQL支持的RDBMS数据库有Oracle、 MySQL、 PostgreSQL、 MS SQL Server,以及其他可以使用JDBC连接的数据库。
Spark SQL支持的NoSQL数据库有HBase、Cassandra。
(4)数据处理接口
Spark SQL支持三种数据处理接口:SQL、HiveSQL和集成语言的查询。可以用Spark SQL来替代Hive从而获得更好的性能。
5.Spark SQL提升性能
Spark SQL采取多种技术来让数据处理应用运行得更快,包括减少磁盘I/O、内存列式缓存、查询优化和代码生成。
(1)磁盘I/O
磁盘I/O是缓慢的,它会显著影响查询执行时间,Spark SQL在读取数据时会跳过没被查询到的分区、行、列。
(2)分区
(3)列存储:如Parquet格式文件
(4)内存中的列式缓存
(5)行跳过
(6)谓词下推降低磁盘I/O
(7)查询优化:Catalyst优化器(分析、逻辑优化、物理计划和代码生成)
6.Spark SQL应用
(1)ETL
(2)数据可视化
(3)分布式JDBC/ODBC/ SQL查询引擎
Spark SQL预先打包了一个Thrift/JDBC/ODBC服务器,客户端应用可以直接连接到这个服务器,并通过Thrift/JDBC/ODBC接口提交SQL/HiveSQL查询。
Spark SQL自带了一个名为Beeline的命令行客户端,用于提交HiveSQL查询。此外任何支持JDBC/ODBC的应用都可以向Spark的JDBC/ODBC服务器提交查询,如Tableau、Zeppelin。
这样的话,
一方面:非程序员可以通过Thrift/JDBC/ODBC服务器来使用Spark,利用SQL/HiveSQL来处理和分析数据(只要会写SQL);
另一方面:使得在多个用户可以很方便的共享单个Spark集群。
Spark SQL的Thrift/JDBC/ODBC服务器看上去像是一个数据库,但是却没有存储引擎,只是一个分布式SQL查询引擎。
(4)数据仓库
数据表、系统表和SQL查询引擎。
Spark SQL可以用于创建一个开源的数据仓库解决方案。
7.实例
以Spark1.6.2和Spark2.2.0为例说明,笔者目前用的是1.6.2
7.1 创建DataFrame
——using spark.read.json(spark 2.0以上)
——using sqlContext.read.json(spark 2.0以下)
(1)生成自己的JSON数据
> stringJSONRDD = sc.parallelize((
>
> { "id": "123",
>
> "name": "Katie",
>
> "age": 19,
>
> "eyeColor": "brown"
>
> },
>
> {
>
> "id": "234",
>
> "name": "Michael",
>
> "age": 22,
>
> "eyeColor": "green"
>
> },
>
> {
>
> "id": "345",
>
> "name": "Simone",
>
> "age": 23,
>
> "eyeColor": "blue"
>
> }))
> stringJSONRDD.take(1)
> [{'age': 19, 'eyeColor': 'brown', 'id': '123', 'name': 'Katie'}]
(2)生成创建DataFrame
> # Spark 2.0+
>
> # swimmersJSON = spark.read.json(stringJSONRDD)
>
> # Spark 1.6.2
>
> swimmersJSON = sqlContext.read.json(stringJSONRDD)
>
> swimmersJSON.show()
> +---+--------+---+-------+
>
> |age|eyeColor| id| name|
>
> +---+--------+---+-------+
>
> | 19| brown|123| Katie|
>
> | 22| green|234|Michael|
>
> | 23| blue|345| Simone|
>
> +---+--------+---+-------+
(3)创建一个临时表swimmersJSON ——与RDD一样,仅仅是DataFrame的一次转换操作,没有动作操作
> # Spark 2.0+
>
> # swimmersJSON.createOrReplaceTempView("swimmersJSON")
>
> # Spark 1.6.2
>
> swimmersJSON.registerTempTable("swimmersJSON")
(4)方法一:DataFrame API查询
> swimmersJSON.show()
> +---+--------+---+-------+
>
> |age|eyeColor| id| name|
>
> +---+--------+---+-------+
>
> | 19| brown|123| Katie|
>
> | 22| green|234|Michael|
>
> | 23| blue|345| Simone|
>
> +---+--------+---+-------+
方法二:SQL查询
> # Spark 2.0+
>
> # spark.sql("select * from swimmersJSON").collect()
>
> # Spark 1.6.2
>
> # 可以使用.show()和.collect() 建议使用.show()或者.take()
>
> sqlContext.sql("select * from swimmersJSON").collect()
> [Row(age=19, eyeColor='brown', id='123', name='Katie'),
>
> Row(age=22, eyeColor='green', id='234', name='Michael'),
>
> Row(age=23, eyeColor='blue', id='345', name='Simone')]
> sqlContext.sql("select * from swimmersJSON").show()
> +---+--------+---+-------+
>
> |age|eyeColor| id| name|
>
> +---+--------+---+-------+
>
> | 19| brown|123| Katie|
>
> | 22| green|234|Michael|
>
> | 23| blue|345| Simone|
>
> +---+--------+---+-------+
7.2 RDD变换到DataFrame
(1)使用反射推断模式Inferring the Schema Using Reflection
> # Print the schema
>
> swimmersJSON.printSchema()
> root
>
> |-- age: long (nullable = true)
>
> |-- eyeColor: string (nullable = true)
>
> |-- id: string (nullable = true)
>
> |-- name: string (nullable = true)
(2)使用编程制定模式Programmatically Specifying the Schema
> # Spark SQL引入数据类型,pyspark.sql.types
>
> from pyspark.sql.types import *
>
> stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])
>
> schemaString = "id name age eyeColor"
>
> schema = StructType([
>
> StructField("id", LongType(), True),
>
> StructField("name", StringType(), True),
>
> StructField("age", LongType(), True),
>
> StructField("eyeColor", StringType(), True)
>
> ])
>
> ############################################################## Apply the schema to the RDD and Create DataFrame
>
> # Spark 2.0+
>
> # swimmers = spark.createDataFrame(stringCSVRDD, schema)
>
> # Spark1.6.2
>
> swimmers = sqlContext.createDataFrame(stringCSVRDD, schema)
>
> ############################################################## Creates a temporary view using the DataFrame
>
> # Spark 2.0+
>
> # swimmers.createOrReplaceTempView("swimmers")
>
> # Spark1.6.2
>
> swimmers.registerTempTable("swimmers")
> swimmers.printSchema()
> root
>
> |-- id: long (nullable = true)
>
> |-- name: string (nullable = true)
>
> |-- age: long (nullable = true)
>
> |-- eyeColor: string (nullable = true)
7.3 利用DataFrame(Spark SQL) API进行查询
一般有两种方法:
方法一:将DataFrame注册成一张临时表利用SQL进行查询。(不推荐使用,还需要注册临时表,不到万不得已,不适用)
方法二:直接使用DataFrame ,如swimmers 直接查询;(推荐使用)
(1)利用SQL查询——就是写SQL没什么好说的
> # spark2.0+
>
> # spark.sql("select * from swimmers").show()
>
> # spark1.6.2
>
> sqlContext.sql("select * from swimmers").show()
> # spark2.0+
>
> # spark.sql("select count(1) from swimmers").show()
>
> # spark1.6.2
>
> sqlContext.sql("select count(*) from swimmers").show()
> # spark2.0+
>
> spark.sql("select id,age from swimmers where age=22").show()
>
> # spark1.6.2
>
> sqlContext.sql("select id,age from swimmers where age=22").show()
> # spark2.0+
>
> # spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()
>
> # spark1.6.2
>
> sqlContext.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()
(2)利用DataFrame API进行查询——推荐使用(较为简单)
> swimmers.show()
>
> swimmers.count()
>
> swimmers.select(["id","age"]).filter("age=22").show()
>
> swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()
>
> swimmers.select(["name", "eyeColor"]).filter("eyeColor like 'b%'").show()
这里推荐读者使用PySpark Shell,自行查看DataFrame的属性和方法。
7.5 案例分析
> df = sqlContext.read.json('file:///root/data/20170614103000.json')
>
> df.show(3)
> +--------------------+
>
> | RoadSegState|
>
> +--------------------+
>
> |[227,201706141030...|
>
> |[227,201706141030...|
>
> |[295,201706141030...|
>
> +--------------------+
> df.printSchema()
> root
>
> |-- RoadSegState: struct (nullable = true)
>
> | |-- DateTimeDelay: long (nullable = true)
>
> | |-- Datetime: string (nullable = true)
>
> | |-- Description: string (nullable = true)
>
> | |-- IntersectionDelay: long (nullable = true)
>
> | |-- IsRoadIntersection: string (nullable = true)
>
> | |-- MobileNumber: long (nullable = true)
>
> | |-- Number: long (nullable = true)
>
> | |-- RoadSegID: string (nullable = true)
>
> | |-- SigNumber: long (nullable = true)
>
> | |-- Speed: double (nullable = true)
>
> | |-- SpeedDiff: double (nullable = true)
>
> | |-- State: string (nullable = true)
>
> | |-- Time: double (nullable = true)
> type(df)
> pyspark.sql.dataframe.DataFrame
> df1 = df.select([df.RoadSegState.DateTimeDelay.alias("DateTimeDelay"),
>
> df.RoadSegState.DateTime.alias("DateTime"),
>
> df.RoadSegState.Description.alias("Description"),
>
> df.RoadSegState.IntersectionDelay.alias("IntersectionDelay"),
>
> df.RoadSegState.IsRoadIntersection.alias("IsRoadIntersection"),
>
> df.RoadSegState.MobileNumber.alias("MobileNumber"),
>
> df.RoadSegState.Number.alias("Number"),
>
> df.RoadSegState.RoadSegID.alias("RoadSegID"),
>
> df.RoadSegState.SigNumber.alias("SigNumber"),
>
> df.RoadSegState.Speed.alias("Speed"),
>
> df.RoadSegState.SpeedDiff.alias("SpeedDiff"),
>
> df.RoadSegState.State.alias("State"),
>
> df.RoadSegState.Time.alias("Time")])
> df1.printSchema()
> root
>
> |-- DateTimeDelay: long (nullable = true)
>
> |-- DateTime: string (nullable = true)
>
> |-- Description: string (nullable = true)
>
> |-- IntersectionDelay: long (nullable = true)
>
> |-- IsRoadIntersection: string (nullable = true)
>
> |-- MobileNumber: long (nullable = true)
>
> |-- Number: long (nullable = true)
>
> |-- RoadSegID: string (nullable = true)
>
> |-- SigNumber: long (nullable = true)
>
> |-- Speed: double (nullable = true)
>
> |-- SpeedDiff: double (nullable = true)
>
> |-- State: string (nullable = true)
>
> |-- Time: double (nullable = true)
> df1.select(['RoadSegID','SigNumber','Speed','SpeedDiff','State','Time']).show(10)
> +------------+---------+-----+---------+-----+-----+
>
> | RoadSegID|SigNumber|Speed|SpeedDiff|State| Time|
>
> +------------+---------+-----+---------+-----+-----+
>
> |000100016645| 9|83.86| 3.27| 3| 86.9|
>
> |000100016644| 34|83.86| 3.27| 3| 78.0|
>
> |000100166451| 4|94.71| 0.0| 3| 83.4|
>
> |000100166450| 4|94.71| 0.0| 3| 91.5|
>
> |000100015970| 0|42.75| 0.0| 2|187.7|
>
> |000100165058| 1|94.71| 0.0| 3|128.6|
>
> |000100016651| 0|42.48| 0.0| 2|158.9|
>
> |000100164791| 1|94.71| 0.0| 3| 17.4|
>
> |000100016650| 6|31.72| 1.07| 2|213.3|
>
> |000100166349| 6|94.71| 0.0| 3| 81.4|
>
> +------------+---------+-----+---------+-----+-----+
>
> only showing top 10 rows
7.6 保存DataFrame
(1)普通保存
> df.write.format("json").save("home")
(2)带分区保存
> #partitionBy方法指定分区避免大量的磁盘I/O操作。下面的例子中根据city这一列进行了分区,所以在home目录下会为每一个city创建一个子目录。如city=("beijing","shanghai",...,"wuhan")
>
> df.write.format("parquet").partitionBy("cify").save("home")
>
> # 下面仅读取城市为“shanghai”的子目录,跳过home下的其他city子目录,对于一个有上百个city的数据集而言,这将有若干个数量级的应用性能提升。
>
> df = sqlContext.read.format("parquet").load("home")
>
> df.registerTempTable("home")
>
> home_city = sqlContext.sql("select * from home where city = 'shanghai'")
(3)JDBC保存
Spark JDBC writer supports following modes:
> append: Append contents of this :class:DataFrame to existing data.
>
> overwrite: Overwrite existing data.
>
> ignore: Silently ignore this operation if data already exists.
>
> error (default case): Throw an exception if data already exists.(默认Spark SQL会抛出异常)
> mode = 'overwrite'
>
> url = "jdbc:postgresql://localhost/shanghai "
>
> #url = "jdbc:mysql://localhost/shanghai "
>
> #url = "jdbc:oracle://localhost/shanghai"
>
> properties = {
>
> "user": "root",
>
> "password": "123456"
>
> }
>
> df.write.jdbc(url=url, table="test", mode=mode, properties=properties)
(4)DataFrameWriter类特别方法用于数据读写数据源
> df.write.json(path)
>
> df.write.parquet(path)
>
> df.write.saveAsTable("hive_table_name")