spark SQL 1.基本操作

2021-06-07  本文已影响0人  caster

1. 进化史

Spark SQL用于结构化数据处理。
Hive:SQL简化了MR操作(on HDFS) 。
Shark:基于Hive开发,更改了MR引擎,提升了SQL on HDFS的性能,即Hive on Spark。
Spark:移除Hive,独立为Spark SQL,封装RDD为DF,DS。

2. 优势:

整合SQL和spark编程,简化RDD编程;
统一方式连接数据源;
兼容Hive HQL;
支持JDBC/ODBC;

3. DataFrame/DataSet < --RDD

DataFrame:以RDD为基础的分布式数据集,带有schema元信息。懒执行,性能优于RDD,底层会进行优化操作。
DataSet:相比DataFrame,提供强类型,方便操作数据。
Spark1.0 => RDD
Spark1.3 => DataFrame
Spark1.6 => DataSet(后续主推)
DataFrame=DataSet[Row]

4. DataFrame构造

SparkSession创建和执行DF和SQL,创建DF方式:

读文件中数字类型数据默认为bigint≈Long,内存中获取数字类型数据默认为Int
将DF创建为视图用于查询:
df.createTempView():适用于于当前session且view名不可替换。
df.createOrReplaceTempView():适用于于当前session且可替换。
df.createGlobalTempView():适用于全局session,查询时view前加global_temp.

5. DataFrame的DSL

使用特定语法DSL,不需要创建view进行查询。

val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
//隐式转换依赖
import spark.implicits._
val df1 = spark.read.json("file/1.txt")
//输出df结果
df1.printSchema()
//调用RDD函数函数
df1.map(e => (e.getString(1),e.getLong(0)+ 1)).show()
df1.select("user","age").show()
//需要计算的话:每个列都需要使用引用,两种方式:
df1.select($"user",$"age"+1).show()
df1.select('user,'age+1).show()
//查询年龄大于10的
df1.filter('age>10).show()
//分组聚合
df1.groupBy("age").count().show()
spark.stop()

6. RDD和DataFrame转换

∵ RDD只关心数据,DF同时关心结构。
∴ RDD+schema=DF

val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
//需要引入转换用包
import spark.implicits._
//rdd转df
val rdd = spark.sparkContext.makeRDD(List(1, 2, 3, 4, 5))
//val df = rdd.toDF()//默认为value
val df = rdd.toDF("id")
df.show()
//df转rdd
val rdd1 = df.rdd
rdd1.foreach(println)
spark.stop()

7. DataSet

强类型数据集,需要提供类型信息(样例类...)。

object Test{

  case class Person(name:String,age:Long)

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
    import  spark.implicits._

    val list=List(Person("a",10),Person("b",11))
    val ds = list.toDS()
    ds.show
    ds.printSchema()

    spark.stop()
  }

}

8. DataFrame和DataSet转换

case class Person(name:String,age:Long)
def main(args: Array[String]): Unit = {

  val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
  import  spark.implicits._

  val df = spark.read.json("file/1.txt")
  val ds = df.as[Person]
  ds.show()
  val df1 = ds.toDF()
}

9. RDD和DataSet转换

case class Person(name:String,age:Long)

def main(args: Array[String]): Unit = {

  val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
  import  spark.implicits._
  val rdd1 = spark.sparkContext.makeRDD(List(Person("a", 10), Person("b", 11)))
  val ds = rdd1.toDS
  ds.rdd

  val rdd2 = spark.sparkContext.makeRDD(List(1,2,3))
  rdd2.toDS.show()//默认为value
}
上一篇下一篇

猜你喜欢

热点阅读