SparkSql之编程方式

2021-07-19  本文已影响0人  万事万物

什么是SparkSql?

SparkSession

在老的版本中,SparkSQL提供两种SQL查询起始点:

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。

引入依赖

    <dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

    </dependencies>

创建SparkSession

导包

import org.apache.spark.sql.SparkSession

SparkSession 构造器

@Stable
class SparkSession private(
    @transient val sparkContext: SparkContext,
    @transient private val existingSharedState: Option[SharedState],
    @transient private val parentSessionState: Option[SessionState],
    @transient private[sql] val extensions: SparkSessionExtensions)
  extends Serializable with Closeable with Logging {...}

SparkSession 主构造器已被私有化,无法通过常规的new创建对象。在SparkSession伴生对象中,有个Builder类及builder方法

第一种方式:
创建Builder 对象获取SparkSession 实例

// 创建Builder实例
val builder = new spark.sql.SparkSession.Builder
// 调用getOrCreate获取 SparkSession 实例
val session: SparkSession = builder.getOrCreate()

第二种方式:
通过SparkSession调用builder()函数获取Builder的实例

// 通过调用 builder() 获取 Builder实例
val builder: SparkSession.Builder = SparkSession.builder()
// 调用getOrCreate获取 SparkSession 实例
val session: SparkSession = builder.getOrCreate()

在使用SparkContext时 可以在SparkConf指定masterappName
如:

val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)

Builder也是可以

val builder: SparkSession.Builder = SparkSession.builder()
builder.master("local[4]")
builder.appName("test")

创建好SparkSession就可以开始下面的工作了。


spark sql 编程有两种方式

声明式:SQL

使用声明式,需要注册成表注册成表的四种方式

示例:
注册成表;viewName指定表名

 df.createGlobalTempView(viewName="表名")

编写sql

sparksession.sql("sql语句")

案例:

@Test
  def sparkSqlBySql(): Unit ={
    val female=List(
      Student(2,"绣花",16,"女",1),
      Student(5,"翠花",19,"女",2),
      Student(9,"王菲菲",20,"女",1),
      Student(11,"小惠",23,"女",1),
      Student(12,"梦雅",25,"女",3)
    )

    val boys=List(
      Student(1,"张三",18,"男",3),
      Student(3,"李四",18,"男",2),
      Student(4,"王五",18,"男",2),
      Student(7,"张鹏",14,"男",1),
      Student(8,"刘秀",13,"男",2),
      Student(10,"乐乐",21,"男",1)
    )

    // 导入隐式转换
    import sparkSession.implicits._

    val femaleDf: DataFrame = female.toDF()
    val boysDf: DataFrame = boys.toDF()

    //合并
    val unionAll=femaleDf.unionAll(boysDf)


    // 注册成表
    unionAll.createOrReplaceTempView(viewName = "student")


    //编写sql

    // 统计男女人数
    sparkSession.sql(
      """
        |select sex,count(*) sex_count from student
        |group by sex
        |""".stripMargin).show()

  }
+---+---------+
|sex|sex_count|
+---+---------+
| 男|        6|
| 女|        5|
+---+---------+

也可以支持开窗

    // 统计男女人数
    sparkSession.sql(
      """
        |select *,row_number() over(partition by sex order by age)as rn from student
        |""".stripMargin).show()
+---+------+---+---+-------+---+
| id|  name|age|sex|classId| rn|
+---+------+---+---+-------+---+
|  8|  刘秀| 13| 男|      2|  1|
|  7|  张鹏| 14| 男|      1|  2|
|  1|  张三| 18| 男|      3|  3|
|  3|  李四| 18| 男|      2|  4|
|  4|  王五| 18| 男|      2|  5|
| 10|  乐乐| 21| 男|      1|  6|
|  2|  绣花| 16| 女|      1|  1|
|  5|  翠花| 19| 女|      2|  2|
|  9|王菲菲| 20| 女|      1|  3|
| 11|  小惠| 23| 女|      1|  4|
| 12|  梦雅| 25| 女|      3|  5|
+---+------+---+---+-------+---+

注意:使用createOrReplaceGlobalTempViewcreateGlobalTempView创建的表后续查询的时候必须通过 global_temp.表名 方式使用

    // 统计男女人数
    sparkSession.sql(
      """
        |select *,row_number() over(partition by sex order by age)as rn from global_temp.student
        |""".stripMargin).show()

    // 获取一个新的sparkSession
    val sparkSession2: SparkSession = sparkSession.newSession()
    sparkSession2.sql(
      """
        |select *,row_number() over(partition by sex order by age)as rn from global_temp.student
        |""".stripMargin).show()

结果都是一样,略...


命令式:DSL

通过算子操作数据
参考:https://blog.csdn.net/dabokele/article/details/52802150

DataFrame对象上Action操作

  1. show:展示数据
  2. collect:获取所有数据到数组
  3. collectAsList:获取所有数据到List
  4. describe(cols: String*):获取指定字段的统计信息
  5. first, head, take, takeAsList:获取若干行记录

DataFrame对象上的条件查询和join等操作

上一篇 下一篇

猜你喜欢

热点阅读