pyspark学习我爱编程

4 Spark SQL

2018-01-09  本文已影响190人  7125messi

DataFrame是一种不可变的分布式数据集,类似于Python pandas DataFrame,允许用户轻松地使用结构化的数据(如数据表)。这样我们就可以Spark SQL来查询结构化的数据或使用Spark表达式方法(注意,这里不使用lambda表达式)。

以前的RDD,由于JAVA的JVM和Python的Py4J之间的通信开销,使得Python相对于Scala和JAVA执行查询会明显变慢。而现在Spark使用Catalyst Optimizer能够显著提高Spark的查询性能。

1.Python和RDD之间的通信

image

每次使用RDD 执行PySpark程序时,潜在地需要巨大的开销来执行作业,在PySpark驱动器中,Spark Context通过Py4J启动一个使用JavaSparkContext的JVM。所有的RDD转换最初都映射到Java中的PythonRDD对象。一旦这些任务被推送到Spark的工作节点,PythonRDD对象就使用管道pipe启动Python的子进程中,发送代码和数据到Python中进行处理。

虽然该方法允许PySpark将数据处理分布到多个工作节点的多个Python子进程中,但是如你所见,Python和JVM之间还是有很多上下文切换和通信开销的。

所以处理大数据量时不建议在PySpark中使用RDD。

2.Catalyst Optimizer(Catalyst优化器)

Spark SQL引擎如此之快的原因在于Catalyst Optimizer。

image

Catalyst Optimizer编译优化了逻辑计划,而且还有一个能够确保生成最有效的物理计划的成本优化器。

3.利用DataFrame加速PySpark

DataFrame和Catalyst优化器配合使用(相较于RDD)可以大大提高PySpark的查询性能。

引入DataFrame前,Python查询速度普遍比使用RDD 的Scala查询慢,主要源于Python和JVM之间的通信开销。

image

注意:虽然DataFrame可以提升PySpark性能,但是有一些例外,如Python UDF(User-Defined-Function)的使用。

4.Spark SQL的特点

Spark SQL提供了比Spark Core更为高层的用于处理结构化数据的抽象。结构化数据包括存储在RDBMS数据库中的数据和NoSQL数据库中的数据,以及诸如Parquet JSON CSV等数据。

Spark SQL不仅为Spark提供了SQL接口,它还有更为宽泛的设计目的:让Spark更加易用,提升开发者的生产力,让Spark应用运行得更快。

Spark SQL支持多种查询语言,包括SQL、HiveSQL等,还可以用于交互分析,无论哪种场景,内部都会调用Spark Core API在Spark集群上执行查询操作。

(1)和其他Spark库集成

Spark SQL可以用于交互分析和对历史数据进行批处理;

Spark SQL可以和Spark Streaming一起进行实时数据流处理;

Spark SQL可以和Spark ML一起在机器学习应用中使用,例如Spark SQL可以用于特征工程。

(2)可用性

相较于Spark Core API,Spark SQL API为多种功能提供了函数,其中包括:选择列、过滤行、聚合列、合并数据集以及其他数据处理、分析中的常见任务。

相较于Spark Core API,Spark SQL API只需要更少的代码就能处理结构化数据。

(3)数据源

Spark SQL支持多种数据源,可以处理文件、RDBMS数据库、NoSQL数据库。文件可以位于本地或者HDFS 、S3。Spark SQL支持的文件格式包括CSV、JSON、Parquet等。

Spark SQL支持的RDBMS数据库有Oracle、 MySQL、 PostgreSQL、 MS SQL Server,以及其他可以使用JDBC连接的数据库。

Spark SQL支持的NoSQL数据库有HBase、Cassandra。

(4)数据处理接口

Spark SQL支持三种数据处理接口:SQL、HiveSQL和集成语言的查询。可以用Spark SQL来替代Hive从而获得更好的性能。

5.Spark SQL提升性能

Spark SQL采取多种技术来让数据处理应用运行得更快,包括减少磁盘I/O、内存列式缓存、查询优化和代码生成。

(1)磁盘I/O

磁盘I/O是缓慢的,它会显著影响查询执行时间,Spark SQL在读取数据时会跳过没被查询到的分区、行、列。

(2)分区

(3)列存储:如Parquet格式文件

(4)内存中的列式缓存

(5)行跳过

(6)谓词下推降低磁盘I/O

(7)查询优化:Catalyst优化器(分析、逻辑优化、物理计划和代码生成)

6.Spark SQL应用

(1)ETL

(2)数据可视化

(3)分布式JDBC/ODBC/ SQL查询引擎

Spark SQL预先打包了一个Thrift/JDBC/ODBC服务器,客户端应用可以直接连接到这个服务器,并通过Thrift/JDBC/ODBC接口提交SQL/HiveSQL查询。

Spark SQL自带了一个名为Beeline的命令行客户端,用于提交HiveSQL查询。此外任何支持JDBC/ODBC的应用都可以向Spark的JDBC/ODBC服务器提交查询,如Tableau、Zeppelin。

这样的话,

一方面:非程序员可以通过Thrift/JDBC/ODBC服务器来使用Spark,利用SQL/HiveSQL来处理和分析数据(只要会写SQL);

另一方面:使得在多个用户可以很方便的共享单个Spark集群。

Spark SQL的Thrift/JDBC/ODBC服务器看上去像是一个数据库,但是却没有存储引擎,只是一个分布式SQL查询引擎。

(4)数据仓库

数据表、系统表和SQL查询引擎。

Spark SQL可以用于创建一个开源的数据仓库解决方案。

7.实例

以Spark1.6.2和Spark2.2.0为例说明,笔者目前用的是1.6.2

7.1 创建DataFrame

——using spark.read.json(spark 2.0以上)

——using sqlContext.read.json(spark 2.0以下)

(1)生成自己的JSON数据

> stringJSONRDD = sc.parallelize((
> 
>   { "id": "123",
> 
>     "name": "Katie",
> 
>     "age": 19,
> 
>     "eyeColor": "brown"
> 
>   },
> 
>   {
> 
>     "id": "234",
> 
>     "name": "Michael",
> 
>     "age": 22,
> 
>     "eyeColor": "green"
> 
>   },
> 
>   {
> 
>     "id": "345",
> 
>     "name": "Simone",
> 
>     "age": 23,
> 
>     "eyeColor": "blue"
> 
>   }))

> stringJSONRDD.take(1)

> [{'age': 19, 'eyeColor': 'brown', 'id': '123', 'name': 'Katie'}]

(2)生成创建DataFrame

> # Spark 2.0+
> 
> # swimmersJSON = spark.read.json(stringJSONRDD)
> 
> # Spark 1.6.2
> 
> swimmersJSON = sqlContext.read.json(stringJSONRDD)
> 
> swimmersJSON.show()

> +---+--------+---+-------+
> 
> |age|eyeColor| id|  name|
> 
> +---+--------+---+-------+
> 
> | 19|  brown|123|  Katie|
> 
> | 22|  green|234|Michael|
> 
> | 23|    blue|345| Simone|
> 
> +---+--------+---+-------+

(3)创建一个临时表swimmersJSON ——与RDD一样,仅仅是DataFrame的一次转换操作,没有动作操作

> # Spark 2.0+
> 
> # swimmersJSON.createOrReplaceTempView("swimmersJSON")
> 
> # Spark 1.6.2
> 
> swimmersJSON.registerTempTable("swimmersJSON")

(4)方法一:DataFrame API查询

> swimmersJSON.show()

> +---+--------+---+-------+
> 
> |age|eyeColor| id|  name|
> 
> +---+--------+---+-------+
> 
> | 19|  brown|123|  Katie|
> 
> | 22|  green|234|Michael|
> 
> | 23|    blue|345| Simone|
> 
> +---+--------+---+-------+

方法二:SQL查询

> # Spark 2.0+
> 
> # spark.sql("select * from swimmersJSON").collect()
> 
> # Spark 1.6.2
> 
> # 可以使用.show()和.collect()   建议使用.show()或者.take()
> 
> sqlContext.sql("select * from swimmersJSON").collect()

> [Row(age=19, eyeColor='brown', id='123', name='Katie'),
> 
> Row(age=22, eyeColor='green', id='234', name='Michael'),
> 
> Row(age=23, eyeColor='blue', id='345', name='Simone')]

> sqlContext.sql("select * from swimmersJSON").show()

> +---+--------+---+-------+
> 
> |age|eyeColor| id|  name|
> 
> +---+--------+---+-------+
> 
> | 19|  brown|123|  Katie|
> 
> | 22|  green|234|Michael|
> 
> | 23|    blue|345| Simone|
> 
> +---+--------+---+-------+

7.2 RDD变换到DataFrame

(1)使用反射推断模式Inferring the Schema Using Reflection

> # Print the schema
> 
> swimmersJSON.printSchema()

> root
> 
> |-- age: long (nullable = true)
> 
> |-- eyeColor: string (nullable = true)
> 
> |-- id: string (nullable = true)
> 
> |-- name: string (nullable = true)

(2)使用编程制定模式Programmatically Specifying the Schema

> # Spark SQL引入数据类型,pyspark.sql.types
> 
> from pyspark.sql.types import *
> 
> stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])
> 
> schemaString = "id name age eyeColor"
> 
> schema = StructType([
> 
>     StructField("id", LongType(), True),   
> 
>     StructField("name", StringType(), True),
> 
>     StructField("age", LongType(), True),
> 
>     StructField("eyeColor", StringType(), True)
> 
> ])
> 
> ############################################################## Apply the schema to the RDD and Create DataFrame
> 
> # Spark 2.0+
> 
> # swimmers = spark.createDataFrame(stringCSVRDD, schema)
> 
> # Spark1.6.2
> 
> swimmers = sqlContext.createDataFrame(stringCSVRDD, schema)
> 
> ############################################################## Creates a temporary view using the DataFrame
> 
> # Spark 2.0+
> 
> # swimmers.createOrReplaceTempView("swimmers")
> 
> # Spark1.6.2
> 
> swimmers.registerTempTable("swimmers")

> swimmers.printSchema()

> root
> 
> |-- id: long (nullable = true)
> 
> |-- name: string (nullable = true)
> 
> |-- age: long (nullable = true)
> 
> |-- eyeColor: string (nullable = true)

7.3 利用DataFrame(Spark SQL) API进行查询

一般有两种方法:

方法一:将DataFrame注册成一张临时表利用SQL进行查询。(不推荐使用,还需要注册临时表,不到万不得已,不适用)

方法二:直接使用DataFrame ,如swimmers 直接查询;(推荐使用)

(1)利用SQL查询——就是写SQL没什么好说的

> # spark2.0+
> 
> # spark.sql("select * from swimmers").show()
> 
> # spark1.6.2
> 
> sqlContext.sql("select * from swimmers").show()

> # spark2.0+
> 
> # spark.sql("select count(1) from swimmers").show()
> 
> # spark1.6.2
> 
> sqlContext.sql("select count(*) from swimmers").show()

> # spark2.0+
> 
> spark.sql("select id,age from swimmers where age=22").show()
> 
> # spark1.6.2
> 
> sqlContext.sql("select id,age from swimmers where age=22").show()

> # spark2.0+
> 
> # spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()
> 
> # spark1.6.2
> 
> sqlContext.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()

(2)利用DataFrame API进行查询——推荐使用(较为简单)

> swimmers.show()
> 
> swimmers.count()
> 
> swimmers.select(["id","age"]).filter("age=22").show()
> 
> swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()
> 
> swimmers.select(["name", "eyeColor"]).filter("eyeColor like 'b%'").show()

这里推荐读者使用PySpark Shell,自行查看DataFrame的属性和方法。

7.5 案例分析

> df = sqlContext.read.json('file:///root/data/20170614103000.json')
> 
> df.show(3)

> +--------------------+
> 
> |        RoadSegState|
> 
> +--------------------+
> 
> |[227,201706141030...|
> 
> |[227,201706141030...|
> 
> |[295,201706141030...|
> 
> +--------------------+

> df.printSchema()

> root
> 
> |-- RoadSegState: struct (nullable = true)
> 
> |    |-- DateTimeDelay: long (nullable = true)
> 
> |    |-- Datetime: string (nullable = true)
> 
> |    |-- Description: string (nullable = true)
> 
> |    |-- IntersectionDelay: long (nullable = true)
> 
> |    |-- IsRoadIntersection: string (nullable = true)
> 
> |    |-- MobileNumber: long (nullable = true)
> 
> |    |-- Number: long (nullable = true)
> 
> |    |-- RoadSegID: string (nullable = true)
> 
> |    |-- SigNumber: long (nullable = true)
> 
> |    |-- Speed: double (nullable = true)
> 
> |    |-- SpeedDiff: double (nullable = true)
> 
> |    |-- State: string (nullable = true)
> 
> |    |-- Time: double (nullable = true)

> type(df)

> pyspark.sql.dataframe.DataFrame

> df1 = df.select([df.RoadSegState.DateTimeDelay.alias("DateTimeDelay"),
> 
>           df.RoadSegState.DateTime.alias("DateTime"),
> 
>           df.RoadSegState.Description.alias("Description"),
> 
>           df.RoadSegState.IntersectionDelay.alias("IntersectionDelay"),
> 
>           df.RoadSegState.IsRoadIntersection.alias("IsRoadIntersection"),
> 
>           df.RoadSegState.MobileNumber.alias("MobileNumber"),
> 
>           df.RoadSegState.Number.alias("Number"),
> 
>           df.RoadSegState.RoadSegID.alias("RoadSegID"),
> 
>           df.RoadSegState.SigNumber.alias("SigNumber"),
> 
>           df.RoadSegState.Speed.alias("Speed"),
> 
>           df.RoadSegState.SpeedDiff.alias("SpeedDiff"),
> 
>           df.RoadSegState.State.alias("State"),
> 
>           df.RoadSegState.Time.alias("Time")])

> df1.printSchema()

> root
> 
> |-- DateTimeDelay: long (nullable = true)
> 
> |-- DateTime: string (nullable = true)
> 
> |-- Description: string (nullable = true)
> 
> |-- IntersectionDelay: long (nullable = true)
> 
> |-- IsRoadIntersection: string (nullable = true)
> 
> |-- MobileNumber: long (nullable = true)
> 
> |-- Number: long (nullable = true)
> 
> |-- RoadSegID: string (nullable = true)
> 
> |-- SigNumber: long (nullable = true)
> 
> |-- Speed: double (nullable = true)
> 
> |-- SpeedDiff: double (nullable = true)
> 
> |-- State: string (nullable = true)
> 
> |-- Time: double (nullable = true)

> df1.select(['RoadSegID','SigNumber','Speed','SpeedDiff','State','Time']).show(10)

> +------------+---------+-----+---------+-----+-----+
> 
> |  RoadSegID|SigNumber|Speed|SpeedDiff|State| Time|
> 
> +------------+---------+-----+---------+-----+-----+
> 
> |000100016645|        9|83.86|    3.27|    3| 86.9|
> 
> |000100016644|      34|83.86|    3.27|    3| 78.0|
> 
> |000100166451|        4|94.71|      0.0|    3| 83.4|
> 
> |000100166450|        4|94.71|      0.0|    3| 91.5|
> 
> |000100015970|        0|42.75|      0.0|    2|187.7|
> 
> |000100165058|        1|94.71|      0.0|    3|128.6|
> 
> |000100016651|        0|42.48|      0.0|    2|158.9|
> 
> |000100164791|        1|94.71|      0.0|    3| 17.4|
> 
> |000100016650|        6|31.72|    1.07|    2|213.3|
> 
> |000100166349|        6|94.71|      0.0|    3| 81.4|
> 
> +------------+---------+-----+---------+-----+-----+
> 
> only showing top 10 rows

7.6 保存DataFrame

(1)普通保存

> df.write.format("json").save("home")

(2)带分区保存

> #partitionBy方法指定分区避免大量的磁盘I/O操作。下面的例子中根据city这一列进行了分区,所以在home目录下会为每一个city创建一个子目录。如city=("beijing","shanghai",...,"wuhan")
> 
> df.write.format("parquet").partitionBy("cify").save("home")    
> 
> # 下面仅读取城市为“shanghai”的子目录,跳过home下的其他city子目录,对于一个有上百个city的数据集而言,这将有若干个数量级的应用性能提升。
> 
> df = sqlContext.read.format("parquet").load("home")
> 
> df.registerTempTable("home")
> 
> home_city = sqlContext.sql("select * from home where city = 'shanghai'")

(3)JDBC保存

Spark JDBC writer supports following modes:

> append:                   Append contents of this :class:DataFrame to existing data.
> 
> overwrite:                Overwrite existing data.
> 
> ignore:                     Silently ignore this operation if data already exists.
> 
> error (default case): Throw an exception if data already exists.(默认Spark SQL会抛出异常)

> mode = 'overwrite'
> 
> url = "jdbc:postgresql://localhost/shanghai "
> 
> #url = "jdbc:mysql://localhost/shanghai "
> 
> #url = "jdbc:oracle://localhost/shanghai"
> 
> properties = {
> 
>     "user": "root",
> 
>     "password": "123456"
> 
> }
> 
> df.write.jdbc(url=url, table="test", mode=mode, properties=properties)

(4)DataFrameWriter类特别方法用于数据读写数据源

> df.write.json(path)
> 
> df.write.parquet(path)
> 
> df.write.saveAsTable("hive_table_name")
上一篇 下一篇

猜你喜欢

热点阅读