Spark SQL(二)DataFrame和DataSet
2018-03-18 本文已影响0人
Sx_Ren
- DataSet:
A Dataset is a distributed collection of data:分布式的数据集(since Spark 1.6) - DataFrame:
A DataFrame is a Dataset organized into named columns:以列(列名、列的类型、列值)的形式构成的分布式数据集,按照列赋予不同的名称,It is conceptually equivalent to a table in a relational database or a data frame in R/Python(概念上等于关系数据库中的表)
DataFrame和DataSet的关系为:DataFrame = Dataset[Row]
DataFrame它不是Spark SQL提出的,而是早起在R、Pandas语言就已经有了的。
怎样得到一个DataFrame呢,Spark 1.x.时使用SQLContext作为entry point:
val sqlContext = new SQLContext(sc)
val people = sqlContext.read.format("json").load(path) //peopel就是一个DataFrame
从Spark 2.0开始,使用SparkSession代替了SQLContext作为entry point:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val df = spark.read.json("examples/src/main/resources/people.json") //df就是一个DataFrame
df.show()
DataFrame常用操作包括:
df.printSchema() // Print the schema in a tree format
df.select("name").show() // Select only the "name" column
df.select($"name", $"age" + 1).show() // Select everybody, but increment the age by 1
df.filter($"age" > 21).show() // Select people older than 21
df.groupBy("age").count().show() // Count people by age
还可以把DataFrame转换位临时表,达到使用sql语句操作文件的目的:
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
如上Temporary views仅是session-scoped的,session销毁了临时表就不存在了,想要创建可以在多个session中共享的表,以达到当前Spark application停掉时内部创建的临时表仍然有效的目的,可以创建全局临时表:
//Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
怎样得到一个DataSet呢,如下:
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
DataFrame和RDD互操作,有两种方式:
- Inferring the Schema Using Reflection:即反射,case class 前提:事先需要知道你的字段、字段类型
- Programmatically Specifying the Schema:编程,Row 这种代码比较繁琐,如果第一种情况不能满足你的要求(事先不知道列)
选型:优先考虑第一种
- Inferring the Schema Using Reflection
case class Person(name: String, age: Long)
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
- Programmatically Specifying the Schema
这种方式代码繁琐一些,有三部曲:- Create an RDD of Rows from the original RDD;
- Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1
- Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
除了上边官网给的例子,再举一个:
val rdd = spark.sparkContext.textFile("E:/ATempFile/info.txt")
val infoRDD = rdd.map(_.split(",")).map(line=>Row(line(0).toInt,line(1),line(2).toInt))
val structType = StructType(Array(StructField("id",IntegerType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)))
val infoDF = spark.createDataFrame(infoRDD,structType)
很明显,第一种方式代码更加简洁、方便。