Spark Sql外部数据源
2020-03-17 本文已影响0人
TZX_0710
Spark支持以下六个核心数据源,同时Spark提供了上百种数据源的读取方式。
CSV、JSON、Parquet、ORC、JDBC/ODBC、Plain-text files
SparkSql的StructField的dataType取值类型如下:
ArrayType, BinaryType, BooleanType, CalendarIntervalType, DateType, HiveStringType, MapType, NullType, NumericType, ObjectType, StringType, StructType, TimestampType
下图所需要的资源文件可以到百度云盘进行下载
链接:https://pan.baidu.com/s/196haN0lWD9Px3MoJ6ybXww
提取码:sgfs
- 读数据格式如下:
spark.read.format("csv")
.option("mode","FAILFAST")//读取模式
.option("inferSchema","true")//是否自动推断schema
.option("path","filePath")//文件路径
.schema(schema)//使用预先定义的schema
.load()
读取模式取值有3个取值:
1.permissive 当遇到损坏的记录时,将其所有字段设置为null,并将所有的损坏记录放在名为_conrruption
2. dropMalformed 删除格式不正确的行
3. failfast 遇到格式不正确的数据立即失败
2.写数据格式
dataframe.write.format("csv")//文件格式
.mode("OVERWRITE")//写模式
.option("dateFormat","yyyy-MM-dd")//日期格式
.option("path","filePath")//文件地址
.save()
写模式有4种可选项:
1.ErrorIfExists 如果给定的路径已经存在文件,则抛出异常。默认的模式
2. Append 数据以追加的方式写入
3. Ovverwrite 数据以覆盖的方式写入
4. Ignore 如果给定的路径已经存在文件,则忽略。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
object spark_sqlsource {
def main(args: Array[String]): Unit = {
//spark数据源读取
var spark=SparkSession.builder().appName("sparkSqlSource").master("local").getOrCreate()
val dataFrame=spark.read.format("csv")
.option("header","false")//文件的第一行是否为列的名称
.option("mode","FAILFAST")//是否快速失败
.option("inferSchema","true")//自动推断schema
.load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.csv")
.show()
//预定义类型
val deptSchema=StructType(Array(
StructField("deptNo",LongType,nullable = false),
StructField("dname",StringType,nullable = true),
StructField("loc",StringType,nullable = true),
StructField("dateTime",DateType,nullable = true)))
val df=spark.read.format("csv")
.option("mode","failfast")
.schema(deptSchema)
.load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.csv")
df.show()
//写入csv文件
df.write.format("csv")
.mode("OVERWRITE")//如果存在则重写
//日期数据格式化写入
.option("dateFormat","yyyy")
.save("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept2")
//读取json文件
val dfjson=spark.read.format("json")
.option("mode","failfast")
.load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.json")
//写入json文件
dfjson.write.format("json")
.mode(saveMode = "overwrite")
.save("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept2")
//读取parquet文件 parquet是一个开源的面向列的存储数据
spark.read.format("parquet")
.option("mode","failfast")
.load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.parquet")
.show()
//写入parquet文件
dfjson.write.format("parquet")
.mode("overwrite")
.save("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\emp")
//链接jdbc查询数据
val sqlQuery= "(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"
spark.read.format("jdbc")
.option("url","jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver","com.mysql.jdbc.Driver")
.option("user","root")
.option("password","root")
.option("dbtable",sqlQuery)//查询sql语句
.option("numPartitions","10")//设置并行度
.load()
.show()
//写入数据 向jdbc 并且创建表
val djson= spark.read.format("json").option("mode","failfast")
.load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.json")
djson.write.format("jdbc")
.mode("append")
.option("url","jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver","com.mysql.jdbc.Driver")
.option("user","root")
.option("password","root")
.option("dbtable","emp")
.save()
}
}
//写入文件的时候可能会报错
(null) entry in command string: null chmod 0644
解决方案:把hadoop.dll 拷贝至C:\\windows\\system32目录下
![]()
image.png
写入文件
option参数配置表:
csv配置表
CSV配置表
json配置表
jdbc配置表
sparksql也有正常数据库操作的一些常用函数、如count、avg、distinct
当然也可以自定义函数 ,包括一些左连接等查询 下面是参考资料
spark-sql函数
spark-sql联接查询