spark Sql
一、什么是spark Sql
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了两个编程抽象分别叫做DataFrame和DataSet,它们用于作为分布式SQL查询引擎,是一种解析传统SQL到大数据运算模型的引擎。从下图可以查看RDD、DataFrames与DataSet的关系。
image.png
二、hive 与spark sql
Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!所以我们类比的理解:Hive---SQL-->MapReduce,Spark SQL---SQL-->RDD。
三、测试数据
我们使用2个csv文件作为部分测试数据:
dept.csv信息:
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
emp.csv信息:
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10
将这2个csv文件put到HDFS的hdfs://hadoop3:80070/input/目录以便后面使用
先打开dfs
start-dfs.sh
[root@bigdata111 ~]# hdfs dfs -ls /input
-rw-r--r-- 3 root supergroup 80 2019-03-01 05:05 /input/dept.csv
-rw-r--r-- 3 root supergroup 603 2019-03-01 05:05 /input/emp.csv
四、创建DataFrame
1、启动spark集群
spark-start-all.sh
2、启动spark_shell
spark-shell
3、创建方式一:使用case class定义表
scala> val rdd=sc.textFile("hdfs://hadoop3/input/dept.csv")
scala> val rdd2=rdd.filter(_.lenght>0).map(_.split(","))
scala> case class Dept(deptno:Int,dname:String,loc:String)
scala> val deptrdd=rdd2.map(x => Dept(x(0).toInt.x(1),x(2)))
scala> val deptdf=deptrdd.toDF
结果输出:
对应sql语句:select * from dept
scala> deptdf.show
+------+----------+--------+
|deptno| dname| loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+
对应sql语句:desc dept
scala> deptrd.printSchema
root
|-- deptno: integer (nullable = false)
|-- dname: string (nullable = true)
|-- loc: string (nullable = true)
、创建方式二:使用SparkSession对象创建DataFrame
scala>val lines = sc.textFile("/root/temp/csv/emp.csv").map(_.split(","))//读取Linux数据
scala>val lines = sc.textFile("hdfs://10.30.30.146:9000/input/emp.csv").map(_.split(","))//读取HDFS数据
scala>import org.apache.spark.sql._
scala>import org.apache.spark.sql.types._
scala>val myschema = StructType(List(StructField("empno", DataTypes.IntegerType)
, StructField("ename", DataTypes.StringType)
,StructField("job", DataTypes.StringType)
,StructField("mgr", DataTypes.StringType)
,StructField("hiredate", DataTypes.StringType)
,StructField("sal", DataTypes.IntegerType)
,StructField("comm", DataTypes.StringType)
,StructField("deptno", DataTypes.IntegerType)))//定义schema:StructType
scala>val rowRDD = lines.map(x=>Row(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))// 把读入的每一行数据映射成一个个Row
scala>val df = spark.createDataFrame(rowRDD,myschema)//使用SparkSession.createDataFrame创建表
结果输出:
scala> df.show
+-----+------+---------+----+----------+----+----+------+
|empno| ename| job| mgr| hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH| CLERK|7902|1980/12/17| 800| | 20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300| 30|
| 7521| WARD| SALESMAN|7698| 1981/2/22|1250| 500| 30|
| 7566| JONES| MANAGER|7839| 1981/4/2|2975| | 20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400| 30|
| 7698| BLAKE| MANAGER|7839| 1981/5/1|2850| | 30|
| 7782| CLARK| MANAGER|7839| 1981/6/9|2450| | 10|
| 7788| SCOTT| ANALYST|7566| 1987/4/19|3000| | 20|
| 7839| KING|PRESIDENT| |1981/11/17|5000| | 10|
| 7844|TURNER| SALESMAN|7698| 1981/9/8|1500| 0| 30|
| 7876| ADAMS| CLERK|7788| 1987/5/23|1100| | 20|
| 7900| JAMES| CLERK|7698| 1981/12/3| 950| | 30|
| 7902| FORD| ANALYST|7566| 1981/12/3|3000| | 20|
| 7934|MILLER| CLERK|7782| 1982/1/23|1300| | 10|
+-----+------+---------+----+----------+----+----+------+
、创建方式三:直接读取格式化的文件(json,csv)等-最简单
准备:
因为/opt/modules/app/spark/examples/src/main/resources目录下有准备好的样例,所以直接将某一个json样例上传
[root@hadoop3 resources]# pwd
/opt/modules/app/spark/examples/src/main/resources
[root@hadoop3 resources]# hadoop fs -put ./people.json /input/
scala> val peopleDF = spark.read.json("hdfs://hadoop3/input/people.json")
19/03/01 06:16:51 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
结果输出:
scala> peopleDF.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
五、操作DataFrame
1、添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.neusoft</groupId>
<artifactId>sparkdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>sparkdemo</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.3.1</spark.version>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
2、查询表
select * from 表
package com.neusoft
import org.apache.spark.sql.SparkSession
object Sparksqldemo2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("test")
.config("spark.sql.shuffle.partitions", "5")
.getOrCreate()
val df = spark.read.json("hdfs://192.168.159.133:8020/input/people.json")
df.show()
}
}
3、DSL操作DataFrame
1.查看所有的员工信息===selec * from empDF;
df.show
image
2.查询所有的员工姓名 ($符号添加不加功能一样)===select ename,deptno from empDF;
df.select("ename","deptno").show
df.select("ename","deptno").show
image
3.查询所有的员工姓名和薪水,并给薪水加100块钱===select ename,sal,sal+100 from empDF;
empDF.select("ename","sal",$"sal"+100).show
image
4.查询工资大于2000的员工===select * from empDF where sal>2000;
df.filter($"sal" > 2000).show
image
5.分组===select deptno,count(*) from empDF group by deptno;
scala>empDF.groupBy(""deptno").count,showscala>empDF.groupby("deptno").avg().show
scala>empDF.groupBy($"deptno").max().show
4、SQL操作DataFrame
(1)前提条件:需要把DataFrame注册成是一个Table或者View
df.createOrReplaceTempView("emp")
(2)使用SparkSession执行从查询
spark.sql("select * from emp").show
spark.sql("select * from emp where deptno=10").show
image
(3)求每个部门的工资总额
spark.sql("select deptno,sum(sal) from emp group by deptno").show
image
六 、视图
在使用SQL操作DataFrame的时候,有一个前提就是必须通过DF创建一个表或者视图:
df.createOrReplaceTempView("emp")
在SparkSQL中,如果你想拥有一个临时的view,并想在不同的Session中共享,而且在application的运行周期内可用,那么就需要创建一个全局的临时view。并记得使用的时候加上global_temp作为前缀来引用它,因为全局的临时view是绑定到系统保留的数据库global_temp上。
① 创建一个普通的view和一个全局的view
df.createOrReplaceTempView("emp1")
df.createGlobalTempView("emp2")
image
② 在当前会话中执行查询,均可查询出结果。
scala>spark.sql("select * from emp1").show
scala>spark.sql("select * from global_temp.emp2").show
③ 开启一个新的会话,执行同样的查询
scala>spark.newSession.sql("select * from emp1").show (运行出错)
scala>spark.newSession.sql("select * from global_temp.emp2").show
七、使用数据源
1、通用的Load/Save函数
(*)什么是parquet文件?
Parquet是列式存储格式的一种文件类型,列式存储有以下的核心:
- 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
- 压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。
- 只读取需要的列,支持向量运算,能够获取更好的扫描性能。
Parquet格式是Spark SQL的默认数据源,可通过spark.sql.sources.default配置
(*)通用的Load/Save函数
- load函数读取Parquet文件:scala>val userDF = spark.read.load("hdfs://bigdata111:9000/input/users.parquet")
对比如下语句:
scala>val peopleDF = spark.read.json("hdfs://bigdata111:9000/input/people.json")
scala>val peopleDF = spark.read.format("json").load("hdfs://bigdata111:9000/input/people.json")
查询Schema和数据:scala>userDF.show
image- save函数保存数据,默认的文件格式:Parquet文件(列式存储文件)
scala>userDF.select("name","favorite_color").write.save("/root/temp/result1")
scala>userDF.select("name","favorite_color").write.format("csv").save("/root/temp/result2")
scala>userDF.select("name","favorite_color").write.csv("/root/temp/result3")
(*)显式指定文件格式:加载json格式
直接加载:val usersDF = spark.read.load("/root/resources/people.json")
会出错
val usersDF = spark.read.format("json").load("/root/resources/people.json")
(*)存储模式(Save Modes)
可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下:
默认为SaveMode.ErrorIfExists模式,该模式下,如果数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库.另外三种模式如下:
SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;
SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。
Demo:
usersDF.select($"name").write.save("/root/result/parquet1")
--> 出错:因为/root/result/parquet1已经存在
usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")
2 读写mysql
2.1 JDBC
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
2.1.1 从Mysql中加载数据库(Spark Shell 方式)
启动Spark Shell,必须指定mysql连接驱动jar包
spark-shell --master spark://hadoop1:7077 --jars mysql-connector-java-5.1.35-bin.jar --driver-class-path mysql-connector-java-5.1.35-bin.jar
从mysql中加载数据
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url"->"jdbc:mysql://hadoop1:3306/bigdata",
"driver"->"com.mysql.jdbc.Driver",
"dbtable"->"person", // "dbtable"->"(select * from person where id = 12) as person",
"user"->"root",
"password"->"123456")
).load()
执行查询
jdbcDF.show()
2.1.2 将数据写入到MySQL中(打jar包方式)
pom依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.neusoft</groupId>
<artifactId>sparkdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>sparkdemo</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.3.1</spark.version>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
编写Spark SQL程序
package com.neusoft
import java.sql
import java.sql.DriverManager
import java.util.Date
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
/**
* Created by Administrator on 2019/3/7.
*/
object SparkDemo {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("hdfs://hadoop3:8020/spark_check_point")
//kafka的topic集合,即可以订阅多个topic,args传参的时候用,隔开
val topicsSet = Set("ss_kafka")
//设置kafka参数,定义brokers集合
val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.159.133:9092,192.168.159.130:9092,192.168.159.134:9092")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
print("---------:" +messages)
val lines = messages.map(_._2)
lines.foreachRDD(rdd => {
//内部函数
def func(records: Iterator[String]) {
var conn: sql.Connection = null
var stmt: sql.PreparedStatement = null
try {
val url = "jdbc:mysql://localhost:3306/test"
val user = "root"
val password = "root" //笔者设置的数据库密码是hadoop,请改成你自己的mysql数据库密码
conn = DriverManager.getConnection(url, user, password)
records.foreach(p => {
val arr = p.split("\\t")
val phoneno = arr(0)
val jingwei = arr(1)
var arrjingwei = jingwei.split(",")
//wei,jing
var sql = "insert into location(time,latitude,longtitude) values (?,?,?)"
stmt = conn.prepareStatement(sql);
stmt.setLong(1, new Date().getTime)
stmt.setDouble(2,java.lang.Double.parseDouble(arrjingwei(0).trim))
stmt.setDouble(3,java.lang.Double.parseDouble(arrjingwei(1).trim))
stmt.executeUpdate()
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (stmt != null) {
stmt.close()
}
if (conn != null) {
conn.close()
}
}
}
val repartitionedRDD = rdd.repartition(1)
repartitionedRDD.foreachPartition(func)
})
ssc.start()
ssc.awaitTermination()
}
}
用maven-shade-plugin插件将程序打包
将jar包提交到spark集群
spark-submit
--class cn.itcast.spark.sql.jdbcDF
--master spark://hadoop1:7077
--jars mysql-connector-java-5.1.35-bin.jar
--driver-class-path mysql-connector-java-5.1.35-bin.jar
/root/demo.jar