spark Sql

2019-03-12  本文已影响0人  小月半会飞

一、什么是spark Sql

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了两个编程抽象分别叫做DataFrame和DataSet,它们用于作为分布式SQL查询引擎,是一种解析传统SQL到大数据运算模型的引擎。从下图可以查看RDD、DataFrames与DataSet的关系。


image.png

二、hive 与spark sql

Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!所以我们类比的理解:Hive---SQL-->MapReduce,Spark SQL---SQL-->RDD。

三、测试数据

我们使用2个csv文件作为部分测试数据:

dept.csv信息:

10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON

emp.csv信息:

7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10

将这2个csv文件put到HDFS的hdfs://hadoop3:80070/input/目录以便后面使用
先打开dfs

start-dfs.sh
[root@bigdata111 ~]# hdfs dfs -ls /input
-rw-r--r--   3 root supergroup         80 2019-03-01 05:05 /input/dept.csv
-rw-r--r--   3 root supergroup        603 2019-03-01 05:05 /input/emp.csv

四、创建DataFrame

1、启动spark集群

spark-start-all.sh

2、启动spark_shell

spark-shell

3、创建方式一:使用case class定义表

scala> val rdd=sc.textFile("hdfs://hadoop3/input/dept.csv")
scala> val rdd2=rdd.filter(_.lenght>0).map(_.split(","))
scala> case class Dept(deptno:Int,dname:String,loc:String)
scala> val deptrdd=rdd2.map(x => Dept(x(0).toInt.x(1),x(2)))
scala> val deptdf=deptrdd.toDF

结果输出:
对应sql语句:select * from dept

scala> deptdf.show
+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+

对应sql语句:desc dept

scala> deptrd.printSchema
root
 |-- deptno: integer (nullable = false)
 |-- dname: string (nullable = true)
 |-- loc: string (nullable = true)

、创建方式二:使用SparkSession对象创建DataFrame

scala>val lines = sc.textFile("/root/temp/csv/emp.csv").map(_.split(","))//读取Linux数据
scala>val lines = sc.textFile("hdfs://10.30.30.146:9000/input/emp.csv").map(_.split(","))//读取HDFS数据
scala>import org.apache.spark.sql._
scala>import org.apache.spark.sql.types._
scala>val myschema = StructType(List(StructField("empno", DataTypes.IntegerType)
        , StructField("ename", DataTypes.StringType)
        ,StructField("job", DataTypes.StringType)
        ,StructField("mgr", DataTypes.StringType)
        ,StructField("hiredate", DataTypes.StringType)
        ,StructField("sal", DataTypes.IntegerType)
        ,StructField("comm", DataTypes.StringType)
        ,StructField("deptno", DataTypes.IntegerType)))//定义schema:StructType

scala>val rowRDD = lines.map(x=>Row(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))// 把读入的每一行数据映射成一个个Row
scala>val df = spark.createDataFrame(rowRDD,myschema)//使用SparkSession.createDataFrame创建表

结果输出:

scala> df.show
+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980/12/17| 800|    |    20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698| 1981/2/22|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|  1981/4/2|2975|    |    20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|  1981/5/1|2850|    |    30|
| 7782| CLARK|  MANAGER|7839|  1981/6/9|2450|    |    10|
| 7788| SCOTT|  ANALYST|7566| 1987/4/19|3000|    |    20|
| 7839|  KING|PRESIDENT|    |1981/11/17|5000|    |    10|
| 7844|TURNER| SALESMAN|7698|  1981/9/8|1500|   0|    30|
| 7876| ADAMS|    CLERK|7788| 1987/5/23|1100|    |    20|
| 7900| JAMES|    CLERK|7698| 1981/12/3| 950|    |    30|
| 7902|  FORD|  ANALYST|7566| 1981/12/3|3000|    |    20|
| 7934|MILLER|    CLERK|7782| 1982/1/23|1300|    |    10|
+-----+------+---------+----+----------+----+----+------+

、创建方式三:直接读取格式化的文件(json,csv)等-最简单

准备:
因为/opt/modules/app/spark/examples/src/main/resources目录下有准备好的样例,所以直接将某一个json样例上传

[root@hadoop3 resources]# pwd
/opt/modules/app/spark/examples/src/main/resources
[root@hadoop3 resources]# hadoop fs -put ./people.json /input/
scala> val peopleDF = spark.read.json("hdfs://hadoop3/input/people.json")
19/03/01 06:16:51 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

结果输出:

scala> peopleDF.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

五、操作DataFrame

1、添加依赖

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.neusoft</groupId>
  <artifactId>sparkdemo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>sparkdemo</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <spark.version>2.3.1</spark.version>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.11</artifactId>
      <version>1.6.2</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.8</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.8</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.47</version>
    </dependency>

  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

2、查询表

select * from 表

package com.neusoft

import org.apache.spark.sql.SparkSession

object Sparksqldemo2 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local")
      .appName("test")
      .config("spark.sql.shuffle.partitions", "5")
      .getOrCreate()

    val df = spark.read.json("hdfs://192.168.159.133:8020/input/people.json")
    df.show()
  }
}

3、DSL操作DataFrame

1.查看所有的员工信息===selec * from empDF;

df.show
image

2.查询所有的员工姓名 ($符号添加不加功能一样)===select ename,deptno from empDF;

df.select("ename","deptno").show
df.select("ename","deptno").show
image

3.查询所有的员工姓名和薪水,并给薪水加100块钱===select ename,sal,sal+100 from empDF;

empDF.select("ename","sal",$"sal"+100).show
image

4.查询工资大于2000的员工===select * from empDF where sal>2000;

df.filter($"sal" > 2000).show
image

5.分组===select deptno,count(*) from empDF group by deptno;
scala>empDF.groupBy(""deptno").count,showscala>empDF.groupby("deptno").avg().show
scala>empDF.groupBy($"deptno").max().show

image

4、SQL操作DataFrame

(1)前提条件:需要把DataFrame注册成是一个Table或者View

df.createOrReplaceTempView("emp")

(2)使用SparkSession执行从查询

spark.sql("select * from emp").show
spark.sql("select * from emp where deptno=10").show
image

(3)求每个部门的工资总额

spark.sql("select deptno,sum(sal) from emp group by deptno").show
image

六 、视图

在使用SQL操作DataFrame的时候,有一个前提就是必须通过DF创建一个表或者视图:

df.createOrReplaceTempView("emp")

在SparkSQL中,如果你想拥有一个临时的view,并想在不同的Session中共享,而且在application的运行周期内可用,那么就需要创建一个全局的临时view。并记得使用的时候加上global_temp作为前缀来引用它,因为全局的临时view是绑定到系统保留的数据库global_temp上。

① 创建一个普通的view和一个全局的view

df.createOrReplaceTempView("emp1")
df.createGlobalTempView("emp2")
image

② 在当前会话中执行查询,均可查询出结果。
scala>spark.sql("select * from emp1").show
scala>spark.sql("select * from global_temp.emp2").show

image

③ 开启一个新的会话,执行同样的查询
scala>spark.newSession.sql("select * from emp1").show (运行出错)
scala>spark.newSession.sql("select * from global_temp.emp2").show

image

七、使用数据源

1、通用的Load/Save函数

(*)什么是parquet文件?
Parquet是列式存储格式的一种文件类型,列式存储有以下的核心:

Parquet格式是Spark SQL的默认数据源,可通过spark.sql.sources.default配置

(*)通用的Load/Save函数

对比如下语句:

scala>val peopleDF = spark.read.json("hdfs://bigdata111:9000/input/people.json")
scala>val peopleDF = spark.read.format("json").load("hdfs://bigdata111:9000/input/people.json")

查询Schema和数据:scala>userDF.show

image

scala>userDF.select("name","favorite_color").write.save("/root/temp/result1")
scala>userDF.select("name","favorite_color").write.format("csv").save("/root/temp/result2")
scala>userDF.select("name","favorite_color").write.csv("/root/temp/result3")

image image

(*)显式指定文件格式:加载json格式
直接加载:val usersDF = spark.read.load("/root/resources/people.json")
会出错
val usersDF = spark.read.format("json").load("/root/resources/people.json")

(*)存储模式(Save Modes)
可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下:
默认为SaveMode.ErrorIfExists模式,该模式下,如果数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库.另外三种模式如下:
SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;
SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。

Demo:
usersDF.select($"name").write.save("/root/result/parquet1")
--> 出错:因为/root/result/parquet1已经存在

usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")

2 读写mysql

2.1 JDBC

Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

2.1.1 从Mysql中加载数据库(Spark Shell 方式)

启动Spark Shell,必须指定mysql连接驱动jar包

spark-shell --master spark://hadoop1:7077 --jars mysql-connector-java-5.1.35-bin.jar --driver-class-path mysql-connector-java-5.1.35-bin.jar

从mysql中加载数据

val jdbcDF = sqlContext.read.format("jdbc").options(
     Map("url"->"jdbc:mysql://hadoop1:3306/bigdata",
            "driver"->"com.mysql.jdbc.Driver", 
            "dbtable"->"person", //  "dbtable"->"(select * from person where id = 12) as person", 
            "user"->"root",
            "password"->"123456")
     ).load()

执行查询

jdbcDF.show()
2.1.2 将数据写入到MySQL中(打jar包方式)

pom依赖:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.neusoft</groupId>
  <artifactId>sparkdemo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>sparkdemo</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <spark.version>2.3.1</spark.version>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.11</artifactId>
      <version>1.6.2</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.8</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.8</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.47</version>
    </dependency>

  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

编写Spark SQL程序

package com.neusoft
import java.sql
import java.sql.DriverManager
import java.util.Date

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
/**
  * Created by Administrator on 2019/3/7.
  */
object SparkDemo {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")

    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("hdfs://hadoop3:8020/spark_check_point")
    //kafka的topic集合,即可以订阅多个topic,args传参的时候用,隔开
    val topicsSet = Set("ss_kafka")
    //设置kafka参数,定义brokers集合
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.159.133:9092,192.168.159.130:9092,192.168.159.134:9092")
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)
    print("---------:" +messages)
    val lines = messages.map(_._2)
    lines.foreachRDD(rdd => {
      //内部函数
      def func(records: Iterator[String]) {
        var conn: sql.Connection = null
        var stmt: sql.PreparedStatement = null
        try {
          val url = "jdbc:mysql://localhost:3306/test"
          val user = "root"
          val password = "root"  //笔者设置的数据库密码是hadoop,请改成你自己的mysql数据库密码
          conn = DriverManager.getConnection(url, user, password)
          records.foreach(p => {
            val arr = p.split("\\t")
            val phoneno = arr(0)
            val jingwei = arr(1)
            var arrjingwei = jingwei.split(",")
            //wei,jing
            var sql = "insert into location(time,latitude,longtitude) values (?,?,?)"
            stmt = conn.prepareStatement(sql);
            stmt.setLong(1, new Date().getTime)
            stmt.setDouble(2,java.lang.Double.parseDouble(arrjingwei(0).trim))
            stmt.setDouble(3,java.lang.Double.parseDouble(arrjingwei(1).trim))
            stmt.executeUpdate()
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }
      val repartitionedRDD = rdd.repartition(1)
      repartitionedRDD.foreachPartition(func)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

用maven-shade-plugin插件将程序打包
将jar包提交到spark集群

spark-submit 
--class cn.itcast.spark.sql.jdbcDF 
--master spark://hadoop1:7077 
--jars mysql-connector-java-5.1.35-bin.jar 
--driver-class-path mysql-connector-java-5.1.35-bin.jar 
/root/demo.jar
上一篇下一篇

猜你喜欢

热点阅读