大数据学习

Spark Sql外部数据源

2020-03-17  本文已影响0人  TZX_0710

Spark支持以下六个核心数据源,同时Spark提供了上百种数据源的读取方式。
CSV、JSON、Parquet、ORC、JDBC/ODBC、Plain-text files
SparkSql的StructField的dataType取值类型如下:
ArrayType, BinaryType, BooleanType, CalendarIntervalType, DateType, HiveStringType, MapType, NullType, NumericType, ObjectType, StringType, StructType, TimestampType
下图所需要的资源文件可以到百度云盘进行下载
链接:https://pan.baidu.com/s/196haN0lWD9Px3MoJ6ybXww
提取码:sgfs

  1. 读数据格式如下:
spark.read.format("csv")
.option("mode","FAILFAST")//读取模式
.option("inferSchema","true")//是否自动推断schema
.option("path","filePath")//文件路径
.schema(schema)//使用预先定义的schema
.load()
读取模式取值有3个取值:
1.permissive  当遇到损坏的记录时,将其所有字段设置为null,并将所有的损坏记录放在名为_conrruption
2. dropMalformed 删除格式不正确的行
3. failfast 遇到格式不正确的数据立即失败

2.写数据格式

dataframe.write.format("csv")//文件格式
.mode("OVERWRITE")//写模式
.option("dateFormat","yyyy-MM-dd")//日期格式
.option("path","filePath")//文件地址  
.save() 
写模式有4种可选项:
1.ErrorIfExists  如果给定的路径已经存在文件,则抛出异常。默认的模式
2. Append 数据以追加的方式写入
3. Ovverwrite 数据以覆盖的方式写入
4. Ignore  如果给定的路径已经存在文件,则忽略。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

object spark_sqlsource {

  def main(args: Array[String]): Unit = {
    //spark数据源读取
   var spark=SparkSession.builder().appName("sparkSqlSource").master("local").getOrCreate()
   val dataFrame=spark.read.format("csv")
      .option("header","false")//文件的第一行是否为列的名称
      .option("mode","FAILFAST")//是否快速失败
      .option("inferSchema","true")//自动推断schema
      .load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.csv")
     .show()
    //预定义类型
    val deptSchema=StructType(Array(
      StructField("deptNo",LongType,nullable = false),
      StructField("dname",StringType,nullable = true),
      StructField("loc",StringType,nullable = true),
      StructField("dateTime",DateType,nullable = true)))

   val df=spark.read.format("csv")
     .option("mode","failfast")
      .schema(deptSchema)
      .load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.csv")

    df.show()

  //写入csv文件
    df.write.format("csv")
    .mode("OVERWRITE")//如果存在则重写
      //日期数据格式化写入
    .option("dateFormat","yyyy")
    .save("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept2")

    //读取json文件
    val dfjson=spark.read.format("json")
      .option("mode","failfast")
      .load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.json")
    //写入json文件
    dfjson.write.format("json")
      .mode(saveMode = "overwrite")
      .save("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept2")

    //读取parquet文件 parquet是一个开源的面向列的存储数据
    spark.read.format("parquet")
      .option("mode","failfast")
      .load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.parquet")
      .show()
    //写入parquet文件
    dfjson.write.format("parquet")
      .mode("overwrite")
      .save("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\emp")
    //链接jdbc查询数据
    val sqlQuery= "(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"
    spark.read.format("jdbc")
      .option("url","jdbc:mysql://127.0.0.1:3306/mysql")
      .option("driver","com.mysql.jdbc.Driver")
      .option("user","root")
      .option("password","root")
      .option("dbtable",sqlQuery)//查询sql语句
      .option("numPartitions","10")//设置并行度
      .load()
      .show()
    //写入数据 向jdbc 并且创建表
   val djson= spark.read.format("json").option("mode","failfast")
      .load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.json")
    djson.write.format("jdbc")
      .mode("append")
      .option("url","jdbc:mysql://127.0.0.1:3306/mysql")
      .option("driver","com.mysql.jdbc.Driver")
      .option("user","root")
      .option("password","root")
      .option("dbtable","emp")
      .save()
  }
}
//写入文件的时候可能会报错
 (null) entry in command string: null chmod 0644
解决方案:把hadoop.dll  拷贝至C:\\windows\\system32目录下

image.png
写入文件

option参数配置表:

csv配置表
CSV配置表
json配置表
jdbc配置表

sparksql也有正常数据库操作的一些常用函数、如count、avg、distinct
当然也可以自定义函数 ,包括一些左连接等查询 下面是参考资料
spark-sql函数
spark-sql联接查询

上一篇 下一篇

猜你喜欢

热点阅读