大数据学习

SparkSQL

2018-05-06  本文已影响5人  大数据技术宅

原文链接:SparkSQL—用之惜之

更多精彩内容请关注笔者公众号:大数据技术宅


SparkSql作为Spark的结构化数据处理模块,提供了非常强大的API,让分析人员用一次,就会为之倾倒,为之着迷,为之至死不渝。在内部,SparkSQL使用额外结构信息来执行额外的优化。在外部,可以使用SQL和DataSet 的API与之交互。本文笔者将带你走进SparkSql的世界,领略SparkSql之诸多妙处。

DataSet和DataFrame

当使用编程语言对结构化数据进行操作时候,SparkSql中返回的数据类型是DataSet/DataFrame,因此开篇笔者就先对这两种数据类型进行简单的介绍。

Dataset 是分布式的数据集合。是Spark 1.6中添加的一个新接口,是特定域对象中的强类型集合,它可以使用函数或者相关操作并行地进行转换等操作,数据集可以由JVM对象构造,然后使用函数转换(map、flatmap、filter等)进行操作。Dataset 支持Scala和javaAPI,不支持Python API。

DataFrame是由列组成的数据集,它在概念上等同于关系数据库中的表或R/Python中的data frame,但在查询引擎上进行了丰富的优化。DataFrame可以由各种各样的源构建,例如:结构化数据文件、hive中的表、外部数据库或现有的RDD。

SparkSQL基于DataFrame的操作

1importorg.apache.spark.sql.SparkSession

2val spark = SparkSession

3.builder()

4.appName("Spark SQL basic example")

5.getOrCreate()

6//引入Spark的隐式类型转换,如将RDD转换成 DataFrame

7importspark.implicits._

8val df = spark.read.json("/data/tmp/SparkSQL/people.json")

9df.show()//将DataFrame的内容进行标准输出

10//+---+-------+

11//|age|   name|

12//+---+-------+

13//|   |Michael|

14//| 19|   Andy|

15//| 30| Justin|

16//+---+-------+

17

18df.printSchema()//打印出DataFrame的表结构

19//root

20// |-- age: string (nullable = true)

21// |-- name: string (nullable = true)

22

23df.select("name").show()

24//类似于select name from DataFrame的SQL语句

25

26df.select($"name", $"age"+1).show()

27//类似于select name,age+1 from DataFrame的SQL语句

28//此处注意,如果对列进行操作,所有列名前都必须加上$符号

29

30df.filter($"age">21).show()

31//类似于select * from DataFrame where age>21 的SQL语句

32

33df.groupBy("age").count().show()

34//类似于select age,count(age) from DataFrame group by age;

35

36//同时也可以直接写SQL进行DataFrame数据的分析

37df.createOrReplaceTempView("people")

38val sqlDF = spark.sql("SELECT * FROM people")

39sqlDF.show()

SparkSQL基于DataSet的操作

由于DataSet吸收了RDD和DataFrame的优点,所有可以同时向操作RDD和DataFrame一样来操作DataSet。看下边一个简单的例子。

1caseclassPerson(name: String, age: Long)

2// 通过 case类创建DataSet

3val caseClassDS

= Seq(Person("Andy",32)).toDS()

4caseClassDS.show()

5// +----+---+

6// |name|age|

7// +----+---+

8// |Andy| 32|

9// +----+---+

10

11// 通过基本类型创建DataSet

12importing spark.implicits._

13val primitiveDS = Seq(1,2,3).toDS()

14primitiveDS.map(_ +1).collect()

15// Returns: Array(2, 3, 4)

16

17// 将DataFrames转换成DataSet

18val path ="examples/src/main/resources/people.json"

19val peopleDS = spark.read.json(path).as[Person]

20peopleDS.show()

21// +----+-------+

22// | age|   name|

23// +----+-------+

24// |null|Michael|

25// |  30|   Andy|

26// |  19| Justin|

27// +----+-------+

在上边的例子中能够发现DataSet的创建是非常简单的,但是笔者需要强调一点,DataSet是强类型的,也就是说DataSet的每一列都有指定的列标识符和数据类型。下边的列子将进一步介绍DataSet与RDD的交互。

1importspark.implicits._

2//将RDD转换成DataFrame

3val peopleDF = spark.sparkContext

4.textFile("examples/src/main/resources/people.txt")

5.map(_.split(","))

6.map(attributes=>Person(attributes(0),attributes(1).trim.toInt))

7.toDF()

8// 将RDD注册为一个临时视图

9peopleDF.createOrReplaceTempView("people")

10//对临时视图进行Sql查询

11val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

12

13// 对teenagersDF 对应的DataFrame进行RDD的算子map操作

14teenagersDF.map(teenager =>"Name: "+ teenager(0)).show()

15// +------------+

16// |       value|

17// +------------+

18// |Name: Justin|

19// +------------+

20

21// 与上一条语句效果一样

22teenagersDF.map(teenager =>"Name: "+ teenager.getAs[String]("name")).show()

23// +------------+

24// |       value|

25// +------------+

26// |Name: Justin|

27// +------------+

SparkSQL操作HIve表

Spark SQL支持读取和写入存储在Apache HIVE中的数据。然而,由于Hive具有大量的依赖关系,默认情况下这些依赖性不包含在Spark分布中。如果能在classpath路径找到Hive依赖文件,Spark将自动加载它们。另外需要注意的是,这些Hive依赖项须存在于所有Spark的Worker节点上,因为它们需要访问Hive序列化和反序列化库(SerDes),以便访问存储在Hive中的数据。

 1importjava.io.File

 2importorg.apache.spark.sql.{Row, SaveMode, SparkSession}

 3

 4caseclassRecord(key: Int, value: String)

 5

 6// 设置hive数据库默认的路径

 7val warehouseLocation

=newFile("spark-warehouse").getAbsolutePath

 8

 9val spark = SparkSession

10.builder()

11.appName("Spark Hive Example")

12.config("spark.sql.warehouse.dir", warehouseLocation)

13.enableHiveSupport()

14.getOrCreate()

15

16importspark.implicits._

17importspark.sql

18

19//创建hive表,导入数据,并且查询数据

20sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")

21sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

22sql("SELECT * FROM src").show()

23

24// +---+-------+

25// |key|  value|

26// +---+-------+

27// |238|val_238|

28// | 86| val_86|

29// |311|val_311|

30// ...

31

32//对hive表数据进行聚合操作

33sql("SELECT COUNT(*) FROM src").show()

34// +--------+

35// |count(1)|

36// +--------+

37// |    500 |

38// +--------+

39

40// sql执行的查询结果返回DataFrame类型数据,支持常用的RDD操作

41val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

42val stringsDS = sqlDF.map {

43caseRow(key: Int, value: String)=> s"Key: $key, Value: $value"

44}

45stringsDS.show()

46// +--------------------+

47// |               value|

48// +--------------------+

49// |Key: 0, Value: val_0|

50// |Key: 0, Value: val_0|

51// |Key: 0, Value: val_0|

52// ...

53

54// 通过DataFrames创建一个临时视图val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))

55recordsDF.createOrReplaceTempView("records")

56

57// 查询操作可以将临时的视图与HIve表中数据进行关联查询

58sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()

59// +---+------+---+------+

60// |key| value|key| value|

61// +---+------+---+------+

62// |  2| val_2|  2| val_2|

63// |  4| val_4|  4| val_4|

64// |  5| val_5|  5| val_5|

65// ...

66

67// 创建一个Hive表,并且以parquet格式存储数据

68sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")

69// 讲DataFrame中数据保存到Hive表里

70val df = spark.table("src")

71df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")

72sql("SELECT * FROM hive_records").show()

73// +---+-------+

74// |key|  value|

75// +---+-------+

76// |238|val_238|

77// | 86| val_86|

78// |311|val_311|

79// ...

80

81// 在指定路径创建一个Parquet文件并且写入数据

82val dataDir ="/tmp/parquet_data"

83spark.range(10).write.parquet(dataDir)

84// 创建HIve外部表

85sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")

86sql("SELECT * FROM hive_ints").show()

87// +---+

88// |key|

89// +---+

90// |  0|

91// |  1|

92// |  2|

93// ...

94

95// Turn on flag for Hive Dynamic Partitioning

96spark.sqlContext.setConf("hive.exec.dynamic.partition","true")

97spark.sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")

98// 通过DataFrame的API创建HIve分区表

99df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")

100sql("SELECT * FROM hive_part_tbl").show()

101// +-------+---+

102// |  value|key|

103// +-------+---+

104// |val_238|238|

105// | val_86| 86|

106// |val_311|311|

107// ...

108

109spark.stop()

当然SparkSql的操作远不止这些,它可以直接对文件快执行Sql查询,也可以通过JDBC连接到关系型数据库,对关系型数据库中的数据进行一些运算分析操作。如果读者感觉不过瘾,可以留言与笔者交流,也可以通过Spark官网查阅相关例子进行学习。下一篇关于Spark的文章,笔者将详细的介绍Spark的常用算子,以满足渴望进行数据分析的小伙伴们的求知的欲望。

上一篇 下一篇

猜你喜欢

热点阅读