Hive平滑过渡到Spark Sql
Hive概述
Hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的 SQL 查询功能,将类 SQL 语句转换为 MapReduce 任务执行。
image.png
Hive产生背景
-
MapReduce编程带来的不便性
MapReduce编程十分繁琐,在大多情况下,每个MapReduce程序需要包含Mapper、Reduceer和一个Driver,之后需要打成jar包扔到集群上运 行。如果mr写完之后,且该项目已经上线,一旦业务逻辑发生了改变,可能就会带来大规模的改动代码,然后重新打包,发布,非常麻烦(这种方式,也是最古老的方式) -
当大量数据都存放在HDFS上,如何快速的对HDFS上的文件进行统计分析操作?
一般来说,想要做会有两种方式:
学Java、学MapReduce(十分麻烦)
做DBA的:写SQL(希望能通过写SQL这样的方式来实现,这种方式较好)
然而,HDFS中最关键的一点就是,数据存储HDFS上是没有schema的概念的
(schema:相当于表里面有列、字段、字段名称、字段与字段之间的分隔符等,这些就是schema信息) ,然而HDFS上的仅仅只是一个纯的文本文件而已 。那么,没有schema,就没办法使用sql进行查询 。因此,在这种背景下,就有问题产生: 如何为HDFS上的文件添加Schema信息?如果加上去,是否就可以通过SQL的方式进行处理了呢?在这种背景下,Hive产生了。
Hive整体架构
image.pngHive架构包括如下组件:CLI(command line interface)、JDBC/ODBC、Thrift Server、Hive WEB Interface(HWI)、metastore和Driver(Complier、Optimizer和Executor)
- Driver组件:核心组件,整个Hive的核心,该组件包括Complier、Optimizer和Executor,它的作用是将我们写的HQL语句进行解析、编译优化,生成执行计划,然后调用底层的MapReduce计算框架。
- Metastore组件:元数据服务组件,这个组件存储hive的元数据,hive的元数据存储在关系数据库里,hive支持的关系数据库有derby、mysql。
- CLI:command line interface,命令行接口。
- ThriftServers:提供JDBC和ODBC接入的能力,它用来进行可扩展且跨语言的服务的开发,hive集成了该服务,能让不同的编程语言调用hive的接口。
- Hive WEB Interface(HWI):hive客户端提供了一种通过网页的方式访问hive所提供的服务。这个接口对应hive的hwi组件(hive web interface)
执行流程示意图
image.pngHive 将通过CLI接入,JDBC/ODBC接入,或者HWI接入的相关查询,通过Driver(Complier、Optimizer和Executor),进行编译,分析优化,最后变成可执行的MapReduce。
Hive 功能有点类似传统的数据库引擎(如mysql),解析器,预处理器,优化器,查询执行计划这些功能的汇总。只不过Hive是将HQL转换成MapReduce,而传统的数据库引擎将SQL转换成执行引擎可以识别的语言
Hive环境搭建
Hadoop环境在大数据入门章节https://www.jianshu.com/p/10700514e3e0
中已经讲述,这里直接使用该环境。
- 把Hive的安装压缩包hive-1.1.0-cdh5.7.0.tar.gz上传到服务器。
- 解压并配置环境变量
tar -zxvf hive-1.1.0-cdh5.7.0.tar.gz -C ~/apps/
cd ~/app
vi ~/.bash_profile
//在文件中添加变量
export HIVE_HOME=/root/apps/hive-1.1.0-cdh5.7.0
export PATH=$HIVE_HOME/bin:$PATH
// 使环境变量生效
source ~/.bash_profile
- 为hive配置关系型数据库地址信息,用于存储hive的元数据。本人用的是MySQL,此处省略了MySQL的安装。在配置之前一定要安装,安装在本地和虚拟机都行,只要网络通,能访问即可。
cd /root/apps/hive-1.1.0-cdh5.7.0/conf
vi hive-site.xml
//在文件中配置以下信息
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!--mysql数据库地址-->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://192.168.30.130:3306/sparksql?createDatabaseIfNotExist=true</value>
</property>
<!-- mysql的driver类 -->
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<!-- 用户名 -->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<!-- 密码 -->
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>191016</value>
</property>
</configuration>
配置完成后需要拷贝mysql驱动到$HIVE_HOME/lib/ ,下载地址https://dev.mysql.com/downloads/connector/j/
- 在hive-env.sh中配置hadoop的安装路径
cd /root/apps/hive-1.1.0-cdh5.7.0/conf
//默认只提供hive-env的模板,需要复制一份
cp hive-env.sh.template hive-env.sh
vi hive-env.sh
在文件中找到HADOOP_HOME
配置为本地安装的hadoop安装路径
- 启动hive,在启动之前需要启动hadoop环境
cd /root/apps/hive-1.1.0-cdh5.7.0/bin
./hive
启动之后可以登录到mysql查看hive是否创建了元数据信息表。
image.png
Hive的简单使用
用hive来对wordcount案例的实现
- 创建表
//表名称为hive_wordcount,有一个string类型的字段context
create table hive_wordcount(context string);
创建后可在mysql的TBLS元数据表中看到新建的表信息
image.png
- 加载需要统计的文件到hive_wordcount表
//load data local inpath 'filepath ' into table 'tablename'
load data local inpath '/root/data/hello.txt' into table hive_wordcount
- 查询统计词频出现的次数
//lateral view explode():是把每行记录按照指定分隔符进行拆解,我的文本字符之间是以空格做分隔
select word,count(1) from hive_wordcount lateral view explode(split(context,' ')) as word group by word
执行后会生成MapReduce作业在yarn上执行。运行结束后,可统计出每个单词出现的次数。
Hive on Spark
Hive默认使用MapReduce作为执行引擎,即Hive on mr。实际上,Hive还可以使用Tez和Spark作为其执行引擎,分别为Hive on Tez和Hive on Spark。由于MapReduce中间计算均需要写入磁盘,而Spark是放在内存中,所以总体来讲Spark比MapReduce快很多。hive on Spark是由Cloudera发起,由Intel、MapR等公司共同参与的开源项目,其目的是把Spark作为Hive的一个计算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行计算。通过该项目,可以提高Hive查询的性能,同时为已经部署了Hive或者Spark的用户提供了更加灵活的选择,从而进一步提高Hive和Spark的普及率。
SparkSQL概述
什么是SparkSQL
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了两个编程抽象分别叫做DataFrame和DataSet,它们用于作为分布式SQL查询引擎。从下图可以查看RDD、DataFrames与DataSet的关系。
image为什么引入SparkSQL
在Hadoop发展过程中,为了给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应用而生,它是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。但是,MapReduce在计算过程中大量的中间磁盘落地过程消耗了大量的磁盘I/O,降低了运行效率。为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现突出的有一个叫做Shark的工具。Shark运行在Spark引擎上,从而使得SQL的查询速度得到了10-100倍的提升。但是,随着Spark的发展,Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等),制约了Spark的既定方针,和各个组件的相互集成,所以才有了SparkSQL。
SparkSQL与Hive on Spark
SparkSQL和Hive On Spark都是在Spark上实现SQL的解决方案。Spark早先有Shark项目用来实现SQL层,不过后来推翻重做了,就变成了SparkSQL。这是Spark官方Databricks的项目,Spark项目本身主推的SQL实现。Hive On Spark比SparkSQL稍晚。
根据发展历程和和spark的集成程度考虑,用SparkSQL略好于Hive on Spark。所以重点介绍SparkSQL的知识点。
Hive平滑过渡到Spark Sql
SQLContext/HiveContext/SparkSession的使用
在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
这里我们简单介绍SparkSession的使用。
- 创建SparkSession
val sparkSession = SparkSession.builder.master("local").appName("spark session example").getOrCreate()
上面代码类似于创建一个SparkContext,master设置为local,然后创建了一个SQLContext封装它。如果你想创建hiveContext,可以使用下面的方法来创建SparkSession,以使得它支持Hive:
val sparkSession = SparkSession.builder.master("local").appName("spark session example").enableHiveSupport().getOrCreate()
enableHiveSupport 函数的调用使得SparkSession支持hive,类似于HiveContext。
- 使用SparkSession读取数据
创建完SparkSession之后,我们就可以使用它来读取数据,下面代码片段是使用SparkSession来从csv文件中读取数据。读取数据后为一个DataFrame对象,可通过DataFrame的函数对数据进行过滤等操作(在后续介绍DataFrame时介绍)。
val df = sparkSession.read.option("header","true").csv("src/main/resources/sales.csv")
spark-shell/spark-sql的使用
- 进入spark/bin下启动spark-shell,成功后运行查询表sql。
spark-shell --master local[2]
spark.sql("show tables").show()
会发现无法访问hive表的数据。此时需要为spark配置hive-site的信息。
- 将hive/conf目录下的hive-site.xml文件拷贝到spark/conf目录下(且添加参数“hive.metastore.schema.verification”的值为“true”,这样做的目的是使得进入spark客户端时不报版本不匹配错误;但是不添加也是可以正常运行的)。由于需要从mysql中访问hive的元数据信息,所以启动时需要指定mysql的连接jar包。启动成功后运行查询表sql
//此处jars后面的参数是mysql的jar包所在的路径
spark-shell --master local[2] --jars jar/mysql-connector-java-5.1.27-bin.jar
spark.sql("show tables").show()
此时可以发现可以访问到测试hive时所建的hive_wordcount表。
鉴于在spark-shell中每一次使用都需要调用spark.sql方法,故可以使用以下命令打开spark客户端:
spark-sql --master local[2] --jars jar/mysql-connector-java-5.1.27-bin.jar
这样即可在客户端直接使用sql代码。
thriftserver/beeline的使用
基于Spark的thirftserver来访问hive中的数据,可以让多个客户端连接到同一个服务器端,跑的是同一个application。Thirftserver作为服务端,beeline作为客户端来访问服务端,支持多个客户端同时访问,有助于多个客户端之间数据的共享。而spark-shell、spark-sql启动都是一个spark application,不能共享数据。
- 首先是启动thriftserver服务端:
服务器端是在spark目录下的sbin目录下,但是启动的时候不能直接使用./start-thriftserver.sh进行启动,会报没有设置master, 另外就是Spark SQL是需要和mysql一样操作表的,所以需要连接mysql的驱动jar,因此命令如下:
./start-thriftserver.sh --master local[2] --jars ~/lib/mysql-connector-java-5.1.38.jar
启动完成之后可以在浏览器中进行查看,是否启动成功;在浏览器中输入以下地址即可.(阿里云服务器需要开放安全组端口)
ip:4040
- 启动beeline客户端进行数据的操作:
启动程序在bin目录下,只需要输入以下命令就会连接到数据库:
./beeline -u jdbc:hive2://localhost:10000 -n hadoop
如图,表示连接成功
image.png
-
出现上面成功界面后,下面就是操作数据库的操作,和mysql中类似;
image.png - 启动thriftserver: 默认端口是10000 ,可在启动时通过配置修改端口号。
./start-thriftserver.sh --master local[2] --jars ~/lib/mysql-connector-java-5.1.27-bin.jar --hiveconf hive.server2.thrift.port=14000
jdbc方式编程访问
在使用jdbc开发时,一定要先启动thriftserver。
- 在maven中添加hive-jdbc依赖
<properties>
<maven.compiler.source>1.5</maven.compiler.source>
<maven.compiler.target>1.5</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<spark.version>2.1.0</spark.version>
</properties>
<!--scala-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--SparkSQL-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--HiveJdbc-->
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1.spark2</version>
</dependency>
- 开发代码,和标准的JDBC编程一样。
/**
* 通过JDBC的方式访问
*/
object SparkSQLThriftServerApp {
def main(args: Array[String]) {
//创建数据库的连接
Class.forName("org.apache.hive.jdbc.HiveDriver")
//获取JDBC连接
val conn = DriverManager.getConnection("jdbc:hive2://192.168.30.130:10000","root","")
// 创建一个Statement
val pstmt = conn.prepareStatement("select empno, ename, sal from emp")
// 执行sql语句。得到结果集
val rs = pstmt.executeQuery()
//操作结果集
while (rs.next()) {
println("empno:" + rs.getInt("empno") +
" , ename:" + rs.getString("ename") +
" , sal:" + rs.getDouble("sal"))
}
//关闭JDBC对象
rs.close()
pstmt.close()
conn.close()
}
}
DataFrame&Dataset
DataFrame概述
DataFrame是一个分布式数据集,可以理解为关系型数据库一张表,由字段和字段类型、字段值按列组织
DataFrame对比RDD
- 两个都是分布式数据容器,DF理解是一个表格除了RDD数据以外还有Schema(数据结构信息),也支持复杂数据类型(map..)。
- DataFrame提供的API比RDD丰富 支持map filter flatMap .....。
- DF提供Schema信息 有利于优化,性能上好。
- 底层 :Java/Scala 操作RDD的底层是跑在JVM上的,而python操作RDD的底层不跑在JVM上,它有Python Execution,这就导致了所运行的效率完全是不一样的。而DF不是直接到运行环境的,中间还有一层是logicplan,先转换成逻辑执行计划之后,再去进行运行的;所以不管采用什么语言,它的执行效率都是一样的。
DataFrame基本API常用操作
/**
* DataFrame API基本操作
*/
object DataFrameApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()
// 将json文件加载成一个dataframe
val peopleDF = spark.read.format("json").load("file:///Users/rocky/data/people.json")
// 输出dataframe对应的schema信息
peopleDF.printSchema()
// 输出数据集的前20条记录
peopleDF.show()
//查询某列所有的数据: select name from table
peopleDF.select("name").show()
// 查询某几列所有的数据,并对列进行计算: select name, age+10 as age2 from table
peopleDF.select(peopleDF.col("name"), (peopleDF.col("age") + 10).as("age2")).show()
//根据某一列的值进行过滤: select * from table where age>19
peopleDF.filter(peopleDF.col("age") > 19).show()
//根据某一列进行分组,然后再进行聚合操作: select age,count(1) from table group by age
peopleDF.groupBy("age").count().show()
spark.stop()
}
}
DataFrame与RDD互操作
- 反射方式
//添加隐式转换
import spark.implicits._
val spark = SparkSession.builder().appName("xxx").master("local[2]").getOrCreate()
//创建一个RDD
val rdd = spark.sparkContext.textFile("xxx")
//转换为DF
val peopleDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt,line(1),line(2).toInt)).toDF()
case class Info(id: Int, name: String, age: Int)
- 编程方式
使用编程接口,构造一个schema并将其应用在已知的RDD上。
val spark = SparkSession.builder().appName("xxx").master("local[2]").getOrCreate()
//创建一个RDD
val rdd = spark.sparkContext.textFile("xxx")
//转换RDD的record为Row
val infoRDD = rdd.map(_.split(",").map(line => Row(line(0).toInt, line(1),line(2).toInt))
//创建一个schema
val structType = StructType(Array(StructField("id",IntergerType,true),StructField("name",StringType,true),StructField("age",IntergerType,true)))
//将schema应用于RDD,转换为DF
val infoDF = spark.createDataFrame(infoRDD,structType)
DataFrame和RDD互操作的两种方式
- 反射:case class 前提:事先知道你的字段、字段类型
- 编程:Row 事先不知道列
- 选型:优先第一种
Dataset概述
从Spark2.0开始,Spark整合了Dataset和DataFrame,前者是有明确类型的数据集,后者是无明确类型的数据集。
DataFrame也可以叫Dataset[Row],dataframe每一行的类型是Row(不解析的话无法得知每一行的字段名和对应的字段类型)
拿出dataframe行中特定字段的方法有两个:
getAS方法
testDF.foreach{
line =>
val col1=line.getAs[String]("col1")
val col2=line.getAs[String]("col2")
}
模式匹配
testDF.map{
case Row(col1:String,col2:Int)=>
println(col1);println(col2)
col1
case _=>
""
}
Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息(可以定义字段名和类型)
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
/**
rdd
("a", 1)
("b", 1)
("a", 1)
* */
val test: Dataset[Coltest]=rdd.map{line=>
Coltest(line._1,line._2)
}.toDS
test.map{
line=>
println(line.col1)
println(line.col2)
}
可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题。
Dataset和RDD/DataFrame的转换
- RDD转Dataset
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd.map {line=>
Coltest(line._1,line._2)
}.toDS
可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可
- Dataset转DataFrame
import spark.implicits._
val testDF = testDS.toDF
- DataFrame转Dataset
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]
这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便
Spark Sql读取/写入外部数据源
- 操作Parquet文件数据
parquet是默认的格式。从一个parquet文件读取数据的代码如下:
val usersDF = spark.read.load("src/main/resources/users.parquet")
spark.read返回DataFrameReader对象,其load方法加载文件中的数据并返回DataFrame对象
将一个DataFrame写到parquet文件
usersDF.write.save("output/parquet/")
DataFrame#write()方法返回DataFrameWriter对象实例,save方法将数据持久化为parquet格式的文件。save的参数是一个目录,而且要求最底层的目录是不存在的。
另外一种写的方式是:
peopleDF.write.parquet("output/parquet/")
- 操作CSV文件
spark.read.option("header", true).format("csv").load("output/csv/")
另外一种简化的读法:
spark.read.option("header", true).csv("output/csv/")
其中的option("header", true)就是告诉读入器这个文件是有表头的。
将DataFrame写入到csv文件时也需要注意表头,将表头也写入文件的方式:
peopleDF.write.option("header", true).format("csv").save("output/csv/")
不写表头,只写数据的方式:
peopleDF.write.format("csv").save("output/csv/")
另外一种简化的写法是:
peopleDF.write.csv("output/csv/")
- 操作JSON
val peopleDF = spark.read.format("json").load(path)
还有一种简化的方式,其本质还是上述的代码:
val peopleDF = spark.read.json(path)
将一个DataFrame写到json文件的方式:
peopleDF.write.format("json").save("output/json/")
另外一种简略的写法:
peopleDF.write.json("output/json/")
- 操作Hive表数据
// 加载Hive表数据
val hiveDF = spark.table("emp")
将DataFrame的数据写入表
tableDF.write.saveAsTable("src_bak")
如果要写入一张已经存在的表,需要按照下面的方式:
tableDF.write.mode(SaveMode.Append).saveAsTable("src_bak")
- 操作MySQL表数据
spark可以直接通过jdbc读取关系型数据库中指定的表。有两种读取的方式,一种是将所有的参数都作为option一条条设置:
val url = "jdbc:mysql://localhost:3306/sparksql?autoReconnect=true&createDatabaseIfNotExist=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true"
val jdbcDF = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", "vulcanus_ljl.data_dict")
.option("user", "vulcanus_ljl")
.option("password", "mypassword")
.load()
另一种是预先将参数封装到Properties对象里:
val url = "jdbc:mysql://localhost:3306/vulcanus_ljl?autoReconnect=true&createDatabaseIfNotExist=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true"
val connectionProperties = new Properties()
connectionProperties.put("user", "vulcanus_ljl")
connectionProperties.put("password", "mypassword")
val jdbcDF2 = spark.read
.jdbc(url, "vulcanus_ljl.data_dict", connectionProperties)
spark还可以通过jdbc将DataFrame写入到一张新表(表必须不存在),写入的方式同样分为两种:
jdbcDF.write
.format("jdbc")
.option("url", url)
.option("dbtable", "vulcanus_ljl.data_dict_temp1")
.option("user", "vulcanus_ljl")
.option("password", "mypassword")
.option("createTableColumnTypes", "dict_name varchar(60), dict_type varchar(60)") // 没有指定的字段使用默认的类型
.save()
和
jdbcDF2.write
.jdbc(url, "vulcanus_ljl.data_dict_temp2", connectionProperties)
其中,url和connectionProperties的内容同上文读取时的设置。
写入时可以通过createTableColumnTypes设置指定多个字段的类型,其他没有指定的字段会使用默认的类型。
- 综合案例,从hive中读出员工表,在MySQL中读出部门表,在查询员工表中部门编号在部门表中的员工。
/**
* 使用外部数据源综合查询Hive和MySQL的表数据
*/
object HiveMySQLApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("HiveMySQLApp")
.master("local[2]").getOrCreate()
// 加载Hive表数据
val hiveDF = spark.table("emp")
// 加载MySQL表数据
val mysqlDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "spark.DEPT").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
// JOIN
val resultDF = hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
resultDF.show
resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),
mysqlDF.col("deptno"), mysqlDF.col("dname")).show
spark.stop()
}
}
Spark Sql愿景
-
写更少的代码
-- 从wordcount角度看:
MapReduce(代码量最多)--->hive(代码量少)---->Spark core(代码量更少,但可读性差)----->Spark SQL(代码量少,可读性好,性能更好)
-- 从外部数据源角度看:
为文件输入输出提供了访问的接口
-- 从schema推导的角度来看:
可以自动推导数据类型,对于数据类型不对的数据,很方便转换,即数据兼容性更好 -
读更少的数据
分区、压缩、pushdown、谓词下压、过滤 -
将优化交给底层
底层优化器自动优化程序,即使是小白也能写出高效的代码。
Spark Sql程序优化项
-
存储格式的选择
采取行式还是列式存储?
列存储写入时次数多,损耗时间多
反过来查询的时候较快 -
压缩格式的选择
考虑压缩速度和压缩文件的分割性
压缩能够较少存储空间、提高数据传输速度
Spark中默认的压缩格式是“snappy” -
代码的优化:
选择的高性能的算子:
foreachPartition => partitionOfRecords.foreach 获得每一条数据
分区的好处是把partition所有的数据先保存到list当中去,然后我们在插入MySQL的时候就可以结合pstmt的批处理,一次过把整个分区数据写进去 -
复用已有的数据:
在项目中,如果同时实现多个功能,在计算时观察每个功能间是否有重叠产生的数据,若有的话把相应的数据提取出来生成,所有的功能实现都能共用(相当于做一个缓存,把中间数据cache ) -
参数的优化:
并行度:spark.sql.shuffle.partitions
默认的是200,配置的是partitions的数量,对应了task的数量
若觉得运行得太慢,则需要吧这个值调大
在conf里面改(YARN启动时) -
关闭分区字段类型推测
默认为开启,若开启之后系统就会自动推测分区字段的类型
关闭后能提升性能