Spark SQL程序

2020-10-16  本文已影响0人  羋学僧

在IDEA中开发Spark SQL程序

1、指定Schema格式

数据E:/RmDownloads/student.txt


SparkSQLDemo1.scala
//指定schema,创建DataFrame
object SparkSQLDemo1 {
  def main(args: Array[String]): Unit = {
    //创建SparkSession
    val spark = SparkSession.builder().master("local").appName("SparkSQLDemo1").getOrCreate()

    //从指定文件中读取数据,生成对应的RDD
    val studentRDD = spark.sparkContext.textFile("E:\\RmDownloads\\student.txt").map(_.split(" "))

    //创建schema,通过StructType
    val schema =types.StructType(
      List(
        StructField("id",IntegerType,true),
        StructField("name",StringType,true),
        StructField("age",IntegerType,true)
      )
    )
    //将RDD映射到RowRDD行的数据上
    val rowRDD = studentRDD.map(student => Row(student(0).toInt, student(1), student(2).toInt))

    //生成DataFrame
    val studentDF =spark.createDataFrame(rowRDD,schema)

    //将DF注册成表/视图
    studentDF.createOrReplaceTempView("student")

    //执行SQL
    val result = spark.sql("select * from student").show()

    //释放资源
    spark.stop()

  }

}

2、使用case class

SparkSQLDemo2.scala

//使用case class
object SparkSQLDemo2 {
  def main(args: Array[String]): Unit = {
    //创建SparkSession
    val spark = SparkSession.builder().master("local").appName("SparkSQLDemo2").getOrCreate()

    //从指定文件中读取数据,生成对应的RDD
    val studentRDD = spark.sparkContext.textFile("E:\\RmDownloads\\student.txt").map(_.split(" "))

    //将数据的RDD和case class关联起来
    val dataRDD = studentRDD.map(x => Student(x(0).toInt, x(1), x(2).toInt))

    //生成DataFrame,通过RDD生成DataFrame,导入隐私转换
    import spark.sqlContext.implicits._

    val studentDF = dataRDD.toDF()

    //注册表,视图
    studentDF.createOrReplaceTempView("student")

    //执行sql
    spark.sql("select * from student").show()

    //释放资源
    spark.stop()
  }

}
//定义case class代表schema结构
case class Student(stuID:Int,stuName:String,stuAge:Int)

就数据保存到数据库

SparkSQLDemo3.scala
lib文件里添加mysql-connector-java-5.1.40-bin.jar

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object SparkSQLDemo3 {

  def main(args: Array[String]): Unit = {
    //创建SparkSession
    val spark = SparkSession.builder().master("local").appName("SparkSQLDemo3").getOrCreate()

    //从指定的文件中读取数据,生成对应的RDD
    val studentRDD = spark.sparkContext.textFile("E:\\RmDownloads\\student.txt").map(_.split(" "))

    //创建schema ,通过StructType
    val schema = StructType(
      List(
        StructField("id",IntegerType,true),
        StructField("name",StringType,true),
        StructField("age",IntegerType,true)
      )
    )

    //将RDD映射到Row RDD 行的数据上
    val rowRDD = studentRDD.map(student => Row(student(0).toInt, student(1), student(2).toInt))
    //生成DataFrame
    val studentDF = spark.createDataFrame(rowRDD,schema)

    //将DF注册成表
    studentDF.createOrReplaceTempView("students")

    //执行SQL
    val result = spark.sql("select * from students")

    //显示
    result.show()

    //将结果保存到mysql中
    val mysqlprops = new Properties()

    // Class.forName("com.mysql.jdbc.Driver")

    mysqlprops.setProperty("user","bigdata")

    mysqlprops.setProperty("password","123456")

    result.write.jdbc("jdbc:mysql://bigdata02:3306/sqoopdb","students",mysqlprops)

    //如果表已经存在,append的方式数据
//    result.write.mode("append").jdbc("jdbc:mysql://bigdata02:3306/sqoopdb","students",mysqlprops)

    //停止spark context
    spark.stop()
  }
}


上一篇 下一篇

猜你喜欢

热点阅读