Spark大数据开发

大数据开发:Spark SQL读取parquet文件操作

2021-06-08  本文已影响0人  成都加米谷大数据

Spark在支持大规模离线数据处理上,是极具优势性能的,而Spark框架的数据处理流程,首先就是引入数据源,其中比较常见的就是parquet文件,通过Spark SQL统一的接口去读取和写入数据。今天的大数据开发学习分享,我们就主要来讲讲,Spark SQL读取parquet文件操作。

1、读取Parquet文件

parquet文件自带schema,读取后是DataFrame格式。

val usersDF =spark.read.load("examples/src/main/resources/users.parquet")

//usersDF: org.apache.spark.sql.DataFrame = [name:string, favorite_color: string ... 1 more field]

2、解析分区信息

parquet文件中如果带有分区信息,那么SparkSQL会自动解析分区信息。比如,这样一份人口数据按照gender和country进行分区存储,目录结构如下:

test

└── spark-sql

    └── test

        ├──gender=male

       │   │

       │   ├── country=US

       │   │   └── data.parquet

       │   ├── country=CN

       │   │   └── data.parquet

       │   └── ...

        └──gender=female

            │

           ├── country=US

           │   └── data.parquet

           ├── country=CN

           │   └── data.parquet

           └── ...

通过spark.read.load读取该目录下的文件SparkSQL将自动解析分区信息,返回的DataFrame的Schema如下:

root

|-- name: string (nullable = true)

|-- age: long (nullable = true)

|-- gender: string (nullable = true)

|-- country: string (nullable = true)

目前自动解析分区支持数值类型和字符串类型。

自动解析分区类型的参数为:spark.sql.sources.partitionColumnTypeInference.enabled,默认值为true。可以关闭该功能,直接将该参数设置为disabled。此时,分区列数据格式将被默认设置为string类型,不会再进行类型解析。

3、Schema合并

如果读取的多个parquet文件中的Schema信息不一致,Spark SQL可以设置参数进行合并,但是Schema合并是一个高消耗的操作,在大多数情况下并不需要,所以Spark SQL从1.5.0开始默认关闭了该功能。

可以通过下面两种方式开启该功能:

a.读取文件的时候,开启合并功能,只对本次读取文件进行合并Schema操作

b.设置全局SQL选项spark.sql.parquet.mergeSchema为true,每次读取文件都会进行合并Schema操作

具体示例:

// sqlContext是之前例子中生成的

// 导入隐式转换

import sqlContext.implicits._

// 创建一个简单的DataFrame并保存

val df1 = sc.makeRDD(1 to 5).map(i => (i, i *2)).toDF("single", "double")

df1.write.parquet("data/test_table/key=1")

// 创建另一个DataFrame,注意字段名

val df2 = sc.makeRDD(6 to 10).map(i => (i, i *3)).toDF("single", "triple")

df2.write.parquet("data/test_table/key=2")

// 读取这两个parquet文件,增加开启合并Schema的设置

val df3 =sqlContext.read.option("mergeSchema","true").parquet("data/test_table")

df3.printSchema()

// 不同名称的字段都保留下来了

// root

// |-- single: int (nullable = true)

// |-- double: int (nullable = true)

// |-- triple: int (nullable = true)

// |-- key : int (nullable = true)

关于schema合并,有一点需要特别注意,那就是当不同parquet文件的schema有冲突时,合并会失败,如同名的字段,其类型不一致的情况。这时如果你读取的是hive数据源,可能会出现读取失败或者读取字段值全部为NULL的情况。如果大家遇到类型场景,可以考虑是否是这个因素导致。

关于大数据开发学习,Spark SQL读取parquet文件操作,以上就为大家做了简单的介绍了。Parquet是Spark SQL常常需要打交道的一种数据源文件,对这个部分,要多去练习巩固。

上一篇 下一篇

猜你喜欢

热点阅读