SparkSQL简介
一、 如何运行Spark SQL 查询
1.1、Spark SQL CLI
要启动Spark SQL CLI ,请在Spark目录中运行以下内容
./bin/spark-sql
可以运行./bin/spark-sql -help
来查看所有的可选选项的完整列表
1.2、Spark的可编程SQL接口
可以通过SparkSession
对象上的sql方法执行sql
语句,将返回一个DataFrame
,这是一个非常强大的接口,因为有些转换操作通过SQL代码表达要比DataFrame表达简单得多
spark.sql(
"""
|select user_id, department, first_name from professors where department in
|(select name from department where created_date >= '2016-01-01')
|""".stripMargin)
1.3、SparkSQL Thrift JDBC/ODBC 服务器
Spark 提供了 一个 Java数据库连接(JDBC)接口,通过它你或远程程序可以连接到Spark驱动器,以便执行SparkSQL查询。
要启动JDBC/ODBC服务器,请在Spark目录下运行以下内容
./sbin/start-thriftserver.sh
此脚本支持全部bin/spark-submit
命令行选项
要查看配置此Thrfit服务器的所有可用选项,请运行./sbin/start-thriftserver.sh --help
.
默认情况下,服务器监听localhost:10000
可以通过更改环境变量或系统属性来更新该监听地址和端口:
对于环境变量配置,请使用以下方法:
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri>
对于系统属性,可以参考下面:
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
二、 Catalog
Spark SQL 中最高级别的抽象是Catalog
。Catalog
是一个抽象,用于存储用户数据中的元数据以及其他有用的东西,如数据库,数据表,函数和视图。
2.1 数据表
Spark SQL在执行任何操作之前首先需要定义数据表,数据表在逻辑上等同于DataFrame,但核心区别在于 DataFrame是在编程语言范围内定义的,而数表是在数据库定义的
- 创建表
Spark允许你从某数据源直接创建表,从文件中读取数据时,你甚至可以指定各种复杂的选项。
CREATE TABLE flights (
DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
USING JSON OPTIONS (path 'data/flight-data/json/2015-summary.json')
--还可以想表中的某些列添加注释
CREATE TABLE flights_csv (
DEST_COUNTRY_NAME STRING,
ORIGIN_COUNTRY_NAME, STRING COMMENT 'remember ,the US will be most prevalent',
count LONG)
USING csv OPTIONS (header true,path '/data/flight-data/csv/2015-summary.csv')
-- 你也可以从查询创建表
CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights
- using 和 stored as
using语法规范具有重要意义,如果没有使用using 指定 格式,则spark将默认为Hive SerDe配置,但是Hive SerDes 比Spark的本机序列化要满得多。Hive用户还可以使用stored as 语法来指定这是一个Hive表。
- 创建外部表
CREATE EXTERNAL TABLE hive_flights (
DEST_COUNTRY_NAME STRING,
ORIGIN_COUNTRY_NAME STRING,
count LONG)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/flight-data-hive/'
-- 还可以从select子句创建外部表
CREATE EXTERNAL TABLE hive_flights_2
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/flight-data-hive/' AS SELECT * FROM flights
- 插入表
INSERT INTO flights_from_select
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 20
-- 插入分区表
INSERT INTO partitioned_flights PARTITION (DEST_COUNTRY_NAME = "UBITED STATES")
SELECT count, ORIGIN_COUNTRY_NAME FROM flights
WHERE DESR_COUNTRY_NAME = 'UNITED STATES' LIMIT 12
- 描述表的元数据
DESCRIBE TABLE flights
- 刷新表的元数据
REFRESH TABLE flights
MSCK REPAIR TABLE flights
- 删除表
DROP TABLE flights;
DROP TABLE IF EXISTS flights_csv;
- 缓存表
CACHE TABLE flights
UNCACHE TABLE flights
2.2 视图
在创建了一个表后,就可以定义视图了。定义视图即指定基于现有表的一组转换操作,基本上只是保存查询计划,这样可以方便组织或重用查询逻辑
- 创建视图
CREATE VIEW just_usa_view
AS SELECT * FROM flights WHERE DEST_COUNTRY_NAME = 'UNITED STATES'
-- 可以创建仅在当前会话期间可用,且未注册到数据库的临时视图
CREATE TEMP VIEW just_usa_view_temp
AS SELECT * FROM flights WHERE DEST_COUNTRY_NAME = 'UNITED STATES'
CREATE GLOBAL TEMP VIEW just_usa_view_temp
AS SELECT * FROM flights WHERE DEST_COUNTRY_NAME = 'UNITED STATES'
-- 显示指定是否覆盖已存在的视图
CREATE OR REPLACE TEMP VIEW just_usa_view_temp
AS SELECT * FROM flights WHERE DEST_COUNTRY_NAME = 'UNITED STATES'
--可以像查询数据表一样查询视图
SELECT * FROM just_usa_view_temp
视图实际上是一种转换,Spark只会在查询时执行它
- 删除视图
DROP VIEW IF EXISTS just_usa_view
2.3 数据库
- 创建数据库
CREATE DATABASE some_db
- 选择数据库
use some_db
-- 列出当前数据库中的所有数据表
show tables
-- 查看当前正在使用的数据库
SELECT current_database()
- 删除数据库
DROP DATABASE IF EXISTS some_db
三、 高级主题
3.1 复杂类型
SparkSQL中支持三种复杂类型:结构体(struct)、列表(list)和映射(map)。
- 结构体
CREATE VIEW IF NOT EXISTS nested_data
AS SELECT
-- 创建一个结构体
(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country,
count FROM flights
--查询结构体中的某一列
SELECT country.DEST_COUTRY_NAME, count from nested_data
--查询结构体中所有值
SELECT country.*, count FROM nested_data
- 列表
在Spark SQL 中有两种创建列表的方式:
collect_list 创建一个包含值的列表
collect_set创建一个不含有重复值的列表
select dest_country_name as new_name, collect_list(count) as flights_counts,
collect_set(origin_country_name) as origin_set
from flights group by dest_country_name
--可以通过设定值的方法来手动地创建数组
select dest_country_name, ARRAY(1, 2, 3) from flights
- 函数
若要在Spark SQL中的函数列表,可以使用SHOW FUNCTIONS 语句
show functions
-- 指定查询系统函数
show system functions
-- 指定查询用户函数
show user functions
-- 可以通过传递带有通配符 "*" 的字符串来实现过滤选择
show functions "s*";
-- 也可以包括LIKE关键字
show functions like "collect*";
- 子查询
在Spark中有两个基本子查询:
相关子查询: 使用来自查询外的一些信息
不相关子查询:不包括外部的信息
- 不相关谓词子查询
--此查询是不相关的,因为它不包含来查询外部的信息,这是一个可以自行运行的查询
select * from flights where origin_country_name in (
select dest_country_name from flights group by dest_country_name order by sum(count) desc limit 5
)
- 相关谓词子查询
相关谓词子查询允许在内部查询中使用外部作用域的信息。
select * from f1 where exists
(select 1 from flights f2 where f1.dest_country_name = f2.origin_country_name)
and exists
(select 1 from flights f2 where f2.dest_country_name = f1.origin_country_name)
- 不相关子标量查询
使用不相关的标量查询scalar query,可以引入一些以前可能没有的补充信息
select *, (select max(count) from flights) as maximum from flights
2.3 其他功能
- 配置
peoperty name | default | meaning |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | 如果设置为true,则Spark SQL会根据数据的统计信息 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 控制柱状缓存的批处理大小。较大的批处理可以提高内存利用率和压缩能力,但在缓存数据时有OutOfMemoryErrors的风险 |
spark.sql.files.maxPartitionBytes | 134217728(128M)单个分区中的最大字节数 | |
spark.sql.files.openCostInBytes | 4194304(4MB) | 打开一个文件的开销估计,即同时可以扫描的字节数量,这个配置会在将多个文件并入一个分区时用到,这个参数最好配置的大一些,因为装着小文件的分区往往会比装更大的分区运行更快 |
spark.sql.broadcastTimeout | 300 | 广播连接中广播等待时间的超时秒数(以秒为单位) |
spark.sql.autoBroadcastJoinThreshold | 10485760(10MB) | 配置在执行连接时,将广播给所有节点的表的最大大小。可以将此值设置为-1来禁用广播 |
spark.sql.shuffle.partitions | 200 | 配置在为连接或聚合shuffle数据时要使用的分区数 |