Spark-SQL

2020-05-13  本文已影响0人  安申

1.在新版本中,SparkSession是Spark最新的SQL查询起点,实质上是SQLContext和HiveContext的组合。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

2.创建DataFrame有三种方式

(1)通过Spark的数据源进行创建;

val df:DataFrame = spark.read.csv("./temp/aaa.csv")

(2)从一个存在的RDD进行转换;

注意:如果需要RDD与DF或者DS之间操作,那么都需要引入import spark.implicits._  (spark不是包名,而是sparkSession对象的名称)

(3)还可以从Hive Table进行查询返回。

val frame1:DataFrame = spark.table("tablename")

3.代码示例:

(1)添加依赖:

<dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-sql_2.11</artifactId>

    <version>2.1.1</version>

</dependency>

(2)object HelloWorld {

              def main(args: Array[String]) {

                //创建SparkConf()并设置App名称

                 val spark = SparkSession.builder().appName("HelloWorld").master("local[*]").getOrCreate()

                //导入隐式转换

                  import spark.implicits._

                //读取本地文件,创建DataFrame

                val df =spark.read.json("examples/src/main/resources/people.json")

                //打印

                df.show()

                //DSL风格:查询年龄在21岁以上的

                df.filter($"age"> 21).show()

                //创建临时表

                df.createOrReplaceTempView("persons")

                //SQL风格:查询年龄在21岁以上的

                spark.sql("SELECT * FROM persons where age > 21").show()

                //关闭连接

                spark.stop()

    }

}

4.Spark SQL数据的加载与保存

加载数据:spark.read.文件格式

保存数据:df.write.文件格式

5.Spark SQL通过JDBC连接MySQL

Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

定义JDBC相关参数配置信息

val connectionProperties = new Properties()

connectionProperties.put("user","root")

connectionProperties.put("password","000000")

//使用read.jdbc加载数据

val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/rdd","rddtable", connectionProperties)

//使用write.jdbc保存数据

jdbcDF2.write.jdbc("jdbc:mysql://hadoop102:3306/mysql","db", connectionProperties)

Spark SQL相关语法

1.filter(condition):根据字段进行筛选

和where使用条件相同

jdbcDF .filter("id = 1 or c1 = 'b'" ).show()

即:过滤出来满足condition条件的,注意,是满足条件的数据被过滤出来

2.selectExpr:对指定字段进行特殊处理

可以直接对指定字段调用UDF函数,或者指定别名等。即,对传入的字段进行特殊处理,可以转换形式,取别名等。

示例,查询id字段,c3字段取别名time,c4字段四舍五入:

jdbcDF .selectExpr("id" , "c3 as time" , "round(c4)" ).show()

ps:众所周知,取别名的时候,可以用as,也可以省略。即:"c3 as time","c3 time"都对

3.col:获取指定字段

只能获取一个字段,返回对象为Column类型。

val idCol = jdbcDF.col(“id”)

4.drop:去除指定字段保留其他字段

返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。

示例:jdbcDF.drop("id")

5.还有就是两个DS或DF进行join,其实最后的结果是两个DS或DF的所有字段

代码:

val spark = SparkSession.builder().appName("JoinDemo").master("local[*]").getOrCreate()

val context=spark.sqlContext

val data1=context.createDataFrame(List(("b","Bob",36))).toDF("id","name","age")

val data2 = context.createDataFrame(List(("staff","Bob",66666))).toDF("job","name","salary")

data1.join(data2,data1.col("name").equalTo(data2.col("name"))).show()

结果:

+---+----+---+-----+----+------+

| id|name|age|  job|name|salary|

+---+----+---+-----+----+------+

|  b| Bob| 36|staff| Bob| 66666|

+---+----+---+-----+----+------+

上一篇下一篇

猜你喜欢

热点阅读