大数据,机器学习,人工智能大数据大数据 爬虫Python AI Sql

Hive平滑过渡到Spark Sql

2019-05-13  本文已影响1人  董二弯

Hive概述

Hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的 SQL 查询功能,将类 SQL 语句转换为 MapReduce 任务执行。


image.png

Hive产生背景

Hive整体架构

image.png

Hive架构包括如下组件:CLI(command line interface)、JDBC/ODBC、Thrift Server、Hive WEB Interface(HWI)、metastore和Driver(Complier、Optimizer和Executor)

执行流程示意图

image.png

Hive 将通过CLI接入,JDBC/ODBC接入,或者HWI接入的相关查询,通过Driver(Complier、Optimizer和Executor),进行编译,分析优化,最后变成可执行的MapReduce。
Hive 功能有点类似传统的数据库引擎(如mysql),解析器,预处理器,优化器,查询执行计划这些功能的汇总。只不过Hive是将HQL转换成MapReduce,而传统的数据库引擎将SQL转换成执行引擎可以识别的语言

Hive环境搭建

Hadoop环境在大数据入门章节https://www.jianshu.com/p/10700514e3e0
中已经讲述,这里直接使用该环境。

tar -zxvf hive-1.1.0-cdh5.7.0.tar.gz -C ~/apps/
cd ~/app
vi  ~/.bash_profile
     //在文件中添加变量
    export HIVE_HOME=/root/apps/hive-1.1.0-cdh5.7.0
    export PATH=$HIVE_HOME/bin:$PATH
// 使环境变量生效
source ~/.bash_profile
cd /root/apps/hive-1.1.0-cdh5.7.0/conf
vi hive-site.xml
 //在文件中配置以下信息

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<!--mysql数据库地址-->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://192.168.30.130:3306/sparksql?createDatabaseIfNotExist=true</value>
    </property>
<!-- mysql的driver类 -->
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
    </property>
<!-- 用户名 -->
  <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>
<!-- 密码 -->
   <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>191016</value>
    </property>
</configuration>

配置完成后需要拷贝mysql驱动到$HIVE_HOME/lib/ ,下载地址https://dev.mysql.com/downloads/connector/j/

cd /root/apps/hive-1.1.0-cdh5.7.0/conf
//默认只提供hive-env的模板,需要复制一份
cp hive-env.sh.template hive-env.sh
vi hive-env.sh
   在文件中找到HADOOP_HOME
   配置为本地安装的hadoop安装路径
cd /root/apps/hive-1.1.0-cdh5.7.0/bin
./hive

启动之后可以登录到mysql查看hive是否创建了元数据信息表。


image.png

Hive的简单使用

用hive来对wordcount案例的实现

//表名称为hive_wordcount,有一个string类型的字段context
create table hive_wordcount(context string);

创建后可在mysql的TBLS元数据表中看到新建的表信息


image.png
//load data local inpath 'filepath ' into table 'tablename'
load data local inpath '/root/data/hello.txt' into table hive_wordcount
//lateral view explode():是把每行记录按照指定分隔符进行拆解,我的文本字符之间是以空格做分隔
select word,count(1) from hive_wordcount lateral view explode(split(context,' ')) as word group by word

执行后会生成MapReduce作业在yarn上执行。运行结束后,可统计出每个单词出现的次数。

Hive on Spark

Hive默认使用MapReduce作为执行引擎,即Hive on mr。实际上,Hive还可以使用Tez和Spark作为其执行引擎,分别为Hive on Tez和Hive on Spark。由于MapReduce中间计算均需要写入磁盘,而Spark是放在内存中,所以总体来讲Spark比MapReduce快很多。hive on Spark是由Cloudera发起,由Intel、MapR等公司共同参与的开源项目,其目的是把Spark作为Hive的一个计算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行计算。通过该项目,可以提高Hive查询的性能,同时为已经部署了Hive或者Spark的用户提供了更加灵活的选择,从而进一步提高Hive和Spark的普及率。

SparkSQL概述

什么是SparkSQL

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了两个编程抽象分别叫做DataFrame和DataSet,它们用于作为分布式SQL查询引擎。从下图可以查看RDD、DataFrames与DataSet的关系。

image

为什么引入SparkSQL

在Hadoop发展过程中,为了给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应用而生,它是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。但是,MapReduce在计算过程中大量的中间磁盘落地过程消耗了大量的磁盘I/O,降低了运行效率。为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现突出的有一个叫做Shark的工具。Shark运行在Spark引擎上,从而使得SQL的查询速度得到了10-100倍的提升。但是,随着Spark的发展,Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等),制约了Spark的既定方针,和各个组件的相互集成,所以才有了SparkSQL。

SparkSQL与Hive on Spark

SparkSQL和Hive On Spark都是在Spark上实现SQL的解决方案。Spark早先有Shark项目用来实现SQL层,不过后来推翻重做了,就变成了SparkSQL。这是Spark官方Databricks的项目,Spark项目本身主推的SQL实现。Hive On Spark比SparkSQL稍晚。
根据发展历程和和spark的集成程度考虑,用SparkSQL略好于Hive on Spark。所以重点介绍SparkSQL的知识点。

Hive平滑过渡到Spark Sql

SQLContext/HiveContext/SparkSession的使用

在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
  SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
这里我们简单介绍SparkSession的使用。

val sparkSession = SparkSession.builder.master("local").appName("spark session example").getOrCreate()

上面代码类似于创建一个SparkContext,master设置为local,然后创建了一个SQLContext封装它。如果你想创建hiveContext,可以使用下面的方法来创建SparkSession,以使得它支持Hive:

val sparkSession = SparkSession.builder.master("local").appName("spark session example").enableHiveSupport().getOrCreate()

enableHiveSupport 函数的调用使得SparkSession支持hive,类似于HiveContext。

val df = sparkSession.read.option("header","true").csv("src/main/resources/sales.csv")

spark-shell/spark-sql的使用

spark-shell --master local[2]
spark.sql("show tables").show()

会发现无法访问hive表的数据。此时需要为spark配置hive-site的信息。

//此处jars后面的参数是mysql的jar包所在的路径
spark-shell --master local[2] --jars jar/mysql-connector-java-5.1.27-bin.jar
spark.sql("show tables").show()

此时可以发现可以访问到测试hive时所建的hive_wordcount表。

鉴于在spark-shell中每一次使用都需要调用spark.sql方法,故可以使用以下命令打开spark客户端:

spark-sql --master local[2] --jars jar/mysql-connector-java-5.1.27-bin.jar

这样即可在客户端直接使用sql代码。

thriftserver/beeline的使用

基于Spark的thirftserver来访问hive中的数据,可以让多个客户端连接到同一个服务器端,跑的是同一个application。Thirftserver作为服务端,beeline作为客户端来访问服务端,支持多个客户端同时访问,有助于多个客户端之间数据的共享。而spark-shell、spark-sql启动都是一个spark application,不能共享数据。

./start-thriftserver.sh --master local[2] --jars ~/lib/mysql-connector-java-5.1.38.jar

启动完成之后可以在浏览器中进行查看,是否启动成功;在浏览器中输入以下地址即可.(阿里云服务器需要开放安全组端口)
ip:4040

./beeline -u jdbc:hive2://localhost:10000 -n hadoop

如图,表示连接成功


image.png
./start-thriftserver.sh  --master local[2] --jars ~/lib/mysql-connector-java-5.1.27-bin.jar  --hiveconf hive.server2.thrift.port=14000

jdbc方式编程访问

在使用jdbc开发时,一定要先启动thriftserver。

<properties>
        <maven.compiler.source>1.5</maven.compiler.source>
        <maven.compiler.target>1.5</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.1.0</spark.version>
 </properties>

<!--scala-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

 <!--SparkSQL-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>    
       </dependency>

 <!--Hive-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

<!--HiveJdbc-->
        <dependency>
            <groupId>org.spark-project.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.2.1.spark2</version>
       </dependency>
/**
 *  通过JDBC的方式访问
 */
object SparkSQLThriftServerApp {

  def main(args: Array[String]) {
    //创建数据库的连接
    Class.forName("org.apache.hive.jdbc.HiveDriver")
    //获取JDBC连接
    val conn = DriverManager.getConnection("jdbc:hive2://192.168.30.130:10000","root","")
    // 创建一个Statement
    val pstmt = conn.prepareStatement("select empno, ename, sal from emp")
    //  执行sql语句。得到结果集
    val rs = pstmt.executeQuery()
    //操作结果集
    while (rs.next()) {
      println("empno:" + rs.getInt("empno") +
        " , ename:" + rs.getString("ename") +
        " , sal:" + rs.getDouble("sal"))
    }
    //关闭JDBC对象
    rs.close()
    pstmt.close()
    conn.close()
  }
}

DataFrame&Dataset

DataFrame概述

DataFrame是一个分布式数据集,可以理解为关系型数据库一张表,由字段和字段类型、字段值按列组织

DataFrame对比RDD

DataFrame基本API常用操作

/**
 * DataFrame API基本操作
 */
object DataFrameApp {

  def main(args: Array[String]) {

    val spark = SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()

    // 将json文件加载成一个dataframe
    val peopleDF = spark.read.format("json").load("file:///Users/rocky/data/people.json")

    // 输出dataframe对应的schema信息
    peopleDF.printSchema()

    // 输出数据集的前20条记录
    peopleDF.show()

    //查询某列所有的数据: select name from table
    peopleDF.select("name").show()

    // 查询某几列所有的数据,并对列进行计算: select name, age+10 as age2 from table
    peopleDF.select(peopleDF.col("name"), (peopleDF.col("age") + 10).as("age2")).show()

    //根据某一列的值进行过滤: select * from table where age>19
    peopleDF.filter(peopleDF.col("age") > 19).show()

    //根据某一列进行分组,然后再进行聚合操作: select age,count(1) from table group by age
    peopleDF.groupBy("age").count().show()

    spark.stop()
  }

}

DataFrame与RDD互操作

//添加隐式转换
import spark.implicits._
val spark = SparkSession.builder().appName("xxx").master("local[2]").getOrCreate()

//创建一个RDD
val rdd = spark.sparkContext.textFile("xxx")
//转换为DF
val peopleDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt,line(1),line(2).toInt)).toDF()

case class Info(id: Int, name: String, age: Int)
val spark = SparkSession.builder().appName("xxx").master("local[2]").getOrCreate()
//创建一个RDD
val rdd = spark.sparkContext.textFile("xxx")

//转换RDD的record为Row
val infoRDD = rdd.map(_.split(",").map(line => Row(line(0).toInt, line(1),line(2).toInt))

//创建一个schema
val structType = StructType(Array(StructField("id",IntergerType,true),StructField("name",StringType,true),StructField("age",IntergerType,true)))

//将schema应用于RDD,转换为DF
val infoDF = spark.createDataFrame(infoRDD,structType)

DataFrame和RDD互操作的两种方式

Dataset概述

从Spark2.0开始,Spark整合了Dataset和DataFrame,前者是有明确类型的数据集,后者是无明确类型的数据集。
DataFrame也可以叫Dataset[Row],dataframe每一行的类型是Row(不解析的话无法得知每一行的字段名和对应的字段类型)
拿出dataframe行中特定字段的方法有两个:
getAS方法

testDF.foreach{
  line =>
    val col1=line.getAs[String]("col1")
    val col2=line.getAs[String]("col2")
}

模式匹配

testDF.map{
      case Row(col1:String,col2:Int)=>
        println(col1);println(col2)
        col1
      case _=>
        ""
    }

Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息(可以定义字段名和类型)

case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
/**
      rdd
      ("a", 1)
      ("b", 1)
      ("a", 1)
      * */
val test: Dataset[Coltest]=rdd.map{line=>
      Coltest(line._1,line._2)
    }.toDS
test.map{
      line=>
        println(line.col1)
        println(line.col2)
    }

可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题。

Dataset和RDD/DataFrame的转换

import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd.map {line=>
      Coltest(line._1,line._2)
    }.toDS

可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可

import spark.implicits._
val testDF = testDS.toDF
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]

这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便

Spark Sql读取/写入外部数据源

val usersDF = spark.read.load("src/main/resources/users.parquet")

spark.read返回DataFrameReader对象,其load方法加载文件中的数据并返回DataFrame对象

将一个DataFrame写到parquet文件

usersDF.write.save("output/parquet/")

DataFrame#write()方法返回DataFrameWriter对象实例,save方法将数据持久化为parquet格式的文件。save的参数是一个目录,而且要求最底层的目录是不存在的。

另外一种写的方式是:

peopleDF.write.parquet("output/parquet/")
spark.read.option("header", true).format("csv").load("output/csv/")
另外一种简化的读法:
spark.read.option("header", true).csv("output/csv/")

其中的option("header", true)就是告诉读入器这个文件是有表头的。

将DataFrame写入到csv文件时也需要注意表头,将表头也写入文件的方式:

peopleDF.write.option("header", true).format("csv").save("output/csv/")

不写表头,只写数据的方式:

peopleDF.write.format("csv").save("output/csv/")

另外一种简化的写法是:

peopleDF.write.csv("output/csv/")
val peopleDF = spark.read.format("json").load(path)
还有一种简化的方式,其本质还是上述的代码:
val peopleDF = spark.read.json(path)

将一个DataFrame写到json文件的方式:

peopleDF.write.format("json").save("output/json/")
另外一种简略的写法:
peopleDF.write.json("output/json/")
// 加载Hive表数据
    val hiveDF = spark.table("emp")

将DataFrame的数据写入表

tableDF.write.saveAsTable("src_bak")
如果要写入一张已经存在的表,需要按照下面的方式:
tableDF.write.mode(SaveMode.Append).saveAsTable("src_bak")
val url = "jdbc:mysql://localhost:3306/sparksql?autoReconnect=true&createDatabaseIfNotExist=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true"

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", url)
  .option("dbtable", "vulcanus_ljl.data_dict")
  .option("user", "vulcanus_ljl")
  .option("password", "mypassword")
  .load()

另一种是预先将参数封装到Properties对象里:

val url = "jdbc:mysql://localhost:3306/vulcanus_ljl?autoReconnect=true&createDatabaseIfNotExist=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true"

val connectionProperties = new Properties()
connectionProperties.put("user", "vulcanus_ljl")
connectionProperties.put("password", "mypassword")

val jdbcDF2 = spark.read
  .jdbc(url, "vulcanus_ljl.data_dict", connectionProperties)

spark还可以通过jdbc将DataFrame写入到一张新表(表必须不存在),写入的方式同样分为两种:

jdbcDF.write
  .format("jdbc")
  .option("url", url)
  .option("dbtable", "vulcanus_ljl.data_dict_temp1")
  .option("user", "vulcanus_ljl")
  .option("password", "mypassword")
  .option("createTableColumnTypes", "dict_name varchar(60), dict_type varchar(60)") // 没有指定的字段使用默认的类型
  .save()
和
jdbcDF2.write
  .jdbc(url, "vulcanus_ljl.data_dict_temp2", connectionProperties)
其中,url和connectionProperties的内容同上文读取时的设置。
写入时可以通过createTableColumnTypes设置指定多个字段的类型,其他没有指定的字段会使用默认的类型。
/**
 * 使用外部数据源综合查询Hive和MySQL的表数据
 */
object HiveMySQLApp {

  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("HiveMySQLApp")
      .master("local[2]").getOrCreate()

    // 加载Hive表数据
    val hiveDF = spark.table("emp")

    // 加载MySQL表数据
    val mysqlDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "spark.DEPT").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()

    // JOIN
    val resultDF = hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
    resultDF.show

    resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),
      mysqlDF.col("deptno"), mysqlDF.col("dname")).show

    spark.stop()
  }
}

Spark Sql愿景

Spark Sql程序优化项

上一篇下一篇

猜你喜欢

热点阅读